Re: Structured streaming from Kafka by timestamp

2019-02-05 Thread Cody Koeninger
To be more explicit, the easiest thing to do in the short term is use
your own instance of KafkaConsumer to get the offsets for the
timestamps you're interested in, using offsetsForTimes, and use those
for the start / end offsets.  See
https://kafka.apache.org/10/javadoc/?org/apache/kafka/clients/consumer/KafkaConsumer.html

Even if you are interested in implementing timestamp filter pushdown,
you need to get that basic usage working first, so I'd start there.

On Fri, Feb 1, 2019 at 11:08 AM Tomas Bartalos  wrote:
>
> Hello,
>
> sorry for my late answer.
> You're right, what I'm doing is a one time query, not a structured streaming. 
> Probably it will be best to describe my use case:
> I'd like to expose live data (via jdbc/odbc) residing in Kafka with the power 
> of spark's distributed sql engine. As jdbc server I use spark thrift server.
> Since timestamp pushdown is not possible :-(, this is a very cumbersome task.
> Let's say I want to inspect last 5 minutes of kafka. First I have to find out 
> offsetFrom per each partition that corresponds to now() - 5 minutes.
> Then I can register a kafka table:
>
> CREATE TABLE ticket_kafka_x USING kafka OPTIONS (kafka.bootstrap.servers 
> 'server1,server2,...',
>
> subscribe 'my_topic',
>
> startingOffsets '{"my_topic" : {"0" : 48532124, "1" : 49029703, "2" : 
> 49456213, "3" : 48400521}}');
>
>
> Then I can issue queries against this table (Data in Kafka is stored in Avro 
> format but I've created custom genericUDF to deserialize the data).
>
> select event.id as id, explode(event.picks) as picks from (
>
> select from_avro(value) as event from ticket_kafka_x where timestamp > 
> from_unixtime(unix_timestamp() - 5 * 60, "-MM-dd HH:mm:ss")
>
> ) limit 100;
>
>
> Whats even more irritating after few minutes I have to re-create this table 
> to reflect the last 5 minutes interval, otherwise the query performance would 
> suffer from increasing data to filter.
>
> Colleague of mine was able to make direct queries with timestamp pushdown in 
> latest Hive.
> How difficult is it to implement this feature in spark, could you lead me to 
> code where I could have a look ?
>
> Thank you,
>
>
> pi 25. 1. 2019 o 0:32 Shixiong(Ryan) Zhu  napísal(a):
>>
>> Hey Tomas,
>>
>> From your description, you just ran a batch query rather than a Structured 
>> Streaming query. The Kafka data source doesn't support filter push down 
>> right now. But that's definitely doable. One workaround here is setting 
>> proper  "startingOffsets" and "endingOffsets" options when loading from 
>> Kafka.
>>
>> Best Regards,
>>
>> Ryan
>>
>>
>> On Thu, Jan 24, 2019 at 10:15 AM Gabor Somogyi  
>> wrote:
>>>
>>> Hi Tomas,
>>>
>>> As a general note don't fully understand your use-case. You've mentioned 
>>> structured streaming but your query is more like a one-time SQL statement.
>>> Kafka doesn't support predicates how it's integrated with spark. What can 
>>> be done from spark perspective is to look for an offset for a specific 
>>> lowest timestamp and start the reading from there.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Thu, Jan 24, 2019 at 6:38 PM Tomas Bartalos  
>>> wrote:

 Hello,

 I'm trying to read Kafka via spark structured streaming. I'm trying to 
 read data within specific time range:

 select count(*) from kafka_table where timestamp > cast('2019-01-23 1:00' 
 as TIMESTAMP) and timestamp < cast('2019-01-23 1:01' as TIMESTAMP);


 The problem is that timestamp query is not pushed-down to Kafka, so Spark 
 tries to read the whole topic from beginning.


 explain query:

 

  +- *(1) Filter ((isnotnull(timestamp#57) && (timestamp#57 > 
 15351480)) && (timestamp#57 < 15352344))


 Scan KafkaRelation(strategy=Subscribe[keeper.Ticket.avro.v1---production], 
 start=EarliestOffsetRangeLimit, end=LatestOffsetRangeLimit) 
 [key#52,value#53,topic#54,partition#55,offset#56L,timestamp#57,timestampType#58]
  PushedFilters: [], ReadSchema: 
 struct>>>

 Obviously the query takes forever to complete. Is there a solution to this 
 ?

 I'm using kafka and kafka-client version 1.1.1


 BR,

 Tomas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Back pressure not working on streaming

2019-02-05 Thread Cody Koeninger
That article is pretty old, If you click through the link to the jira
mentioned in it, https://issues.apache.org/jira/browse/SPARK-18580 ,
it's been resolved.

On Wed, Jan 2, 2019 at 12:42 AM JF Chen  wrote:
>
> yes, 10 is a very low value for testing initial rate.
> And from this article 
> https://www.linkedin.com/pulse/enable-back-pressure-make-your-spark-streaming-production-lan-jiang/,
>  it seems spark back pressure is not available for dstream?
> So ,max rate per partition is the only available back pressure solution for 
> kafka dstream input?
>
> Regard,
> Junfeng Chen
>
>
> On Wed, Jan 2, 2019 at 11:49 AM HARSH TAKKAR  wrote:
>>
>> There is separate property for max rate , by default is is not set, so if 
>> you want to limit the max rate you should  provide that property  a value.
>>
>> Initial rate =10 means it will pick only 10 records per receiver in the 
>> batch interval when you start the process.
>>
>> Depending  upon the consumption rate it will increase  the consumption of 
>> records for processing in each batch.
>>
>> However i, feel 10 is way to low number for 32 partitioned kafka topic.
>>
>>
>>
>> Regards
>> Harsh
>> Happy New Year
>>
>> On Wed 2 Jan, 2019, 08:33 JF Chen >>
>>> I have set  spark.streaming.backpressure.enabled to true,  
>>> spark.streaming.backpressure.initialRate to 10.
>>> Once my application started, it received 32 million messages on first batch.
>>> My application runs every 300 seconds, with 32 kafka partition. So what's 
>>> is the max rate if I set initial rate to 10?
>>>
>>> Thanks!
>>>
>>>
>>> Regard,
>>> Junfeng Chen

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-31 Thread Cody Koeninger
Just to be 100% sure, when you're logging the group id in
createDirectStream, you no longer see any duplicates?

Regarding testing master, is the blocker that your spark cluster is on
2.3?  There's at least a reasonable chance that building an
application assembly jar that uses the master version just for the
spark-streaming-kafka-0-10 artifact will still work on a 2.3 cluster

On Fri, Aug 31, 2018 at 8:55 AM, Bryan Jeffrey  wrote:
> Cody,
>
> We are connecting to multiple clusters for each topic.  I did experiment
> this morning with both adding a cluster identifier to the group id, as well
> as simply moving to use only a single one of our clusters.  Neither of these
> were successful.  I am not able to run a test against master now.
>
> Regards,
>
> Bryan Jeffrey
>
>
>
>
> On Thu, Aug 30, 2018 at 2:56 PM Cody Koeninger  wrote:
>>
>> I doubt that fix will get backported to 2.3.x
>>
>> Are you able to test against master?  2.4 with the fix you linked to
>> is likely to hit code freeze soon.
>>
>> From a quick look at your code, I'm not sure why you're mapping over
>> an array of brokers.  It seems like that would result in different
>> streams with the same group id, because broker isn't part of your
>> group id string.
>>
>> On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey 
>> wrote:
>> > Hello, Spark Users.
>> >
>> > We have an application using Spark 2.3.0 and the 0.8 Kafka client.
>> > We're
>> > have a Spark streaming job, and we're reading a reasonable amount of
>> > data
>> > from Kafka (40 GB / minute or so).  We would like to move to using the
>> > Kafka
>> > 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having
>> > to
>> > modify formats.
>> >
>> > We've run into https://issues.apache.org/jira/browse/SPARK-19185,
>> > 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've
>> > tried to
>> > work around it as follows:
>> >
>> > 1. Disabled consumer caching.  This increased the total job time from ~1
>> > minute per batch to ~1.8 minutes per batch.  This performance penalty is
>> > unacceptable for our use-case. We also saw some partitions stop
>> > receiving
>> > for an extended period of time - I was unable to get a simple repro for
>> > this
>> > effect though.
>> > 2. Disabled speculation and multiple-job concurrency and added caching
>> > for
>> > the stream directly after reading from Kafka & caching offsets.  This
>> > approach seems to work well for simple examples (read from a Kafka
>> > topic,
>> > write to another topic). However, when we move through more complex
>> > logic we
>> > continue to see this type of error - despite only creating the stream
>> > for a
>> > given topic a single time.  We validated that we're creating the stream
>> > from
>> > a given topic / partition a single time by logging on stream creation,
>> > caching the stream and (eventually) calling 'runJob' to actually go and
>> > fetch the data. Nonetheless with multiple outputs we see the
>> > ConcurrentModificationException.
>> >
>> > I've included some code down below.  I would be happy if anyone had
>> > debugging tips for the workaround.  However, my main concern is to
>> > ensure
>> > that the 2.4 version will have a bug fix that will work for Spark
>> > Streaming
>> > in which multiple input topics map data to multiple outputs. I would
>> > also
>> > like to understand if the fix
>> > (https://github.com/apache/spark/pull/20997)
>> > will be backported to Spark 2.3.x
>> >
>> > In our code, read looks like the following:
>> >
>> > case class StreamLookupKey(topic: Set[String], brokers: String)
>> >
>> > private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] =
>> > Map()
>> >
>> > // Given inputs return a direct stream.
>> > def createDirectStream(ssc: StreamingContext,
>> >additionalKafkaParameters: Map[String, String],
>> >brokersToUse: Array[String], //
>> > broker1,broker2|broker3,broker4
>> >topicsToUse: Array[String],
>> >applicationName: String,
>> >persist: Option[PersistenceManager],
>> >useOldestOffsets: Boolean,
>> >maxRatePerPartition: Long,

Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-30 Thread Cody Koeninger
You're using an older version of spark, with what looks like a
manually included different version of the kafka-clients jar (1.0)
than what that version of the spark connector was written to depend on
(0.10.0.1), so there's no telling what's going on.

On Wed, Aug 29, 2018 at 3:40 PM, Guillermo Ortiz Fernández
 wrote:
> I can't... do you think that it's a possible bug of this version?? from
> Spark or Kafka?
>
> El mié., 29 ago. 2018 a las 22:28, Cody Koeninger ()
> escribió:
>>
>> Are you able to try a recent version of spark?
>>
>> On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
>>  wrote:
>> > I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
>> > exception and Spark dies.
>> >
>> > I couldn't see any error or problem among the machines, anybody has the
>> > reason about this error?
>> >
>> >
>> > java.lang.IllegalStateException: This consumer has already been closed.
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
>> > ~[kafka-clients-1.0.0.jar:na]
>> > at
>> >
>> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
>> > ~[kafka-clients-1.0.0.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
>> > ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at scala.Option.orElse(Option.scala:289)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>> > ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
>> > at
>> >
>> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>> > ~[scala-library-2.11.11.jar:na]
>> > at
>> >
>> > scala.collection.mutable.ResizableArray$class.foreach(Resi

Re: ConcurrentModificationExceptions with CachedKafkaConsumers

2018-08-30 Thread Cody Koeninger
I doubt that fix will get backported to 2.3.x

Are you able to test against master?  2.4 with the fix you linked to
is likely to hit code freeze soon.

>From a quick look at your code, I'm not sure why you're mapping over
an array of brokers.  It seems like that would result in different
streams with the same group id, because broker isn't part of your
group id string.

On Thu, Aug 30, 2018 at 12:27 PM, Bryan Jeffrey  wrote:
> Hello, Spark Users.
>
> We have an application using Spark 2.3.0 and the 0.8 Kafka client.  We're
> have a Spark streaming job, and we're reading a reasonable amount of data
> from Kafka (40 GB / minute or so).  We would like to move to using the Kafka
> 0.10 client to avoid requiring our (0.10.2.1) Kafka brokers from having to
> modify formats.
>
> We've run into https://issues.apache.org/jira/browse/SPARK-19185,
> 'ConcurrentModificationExceptions with CachedKafkaConsumers'.  I've tried to
> work around it as follows:
>
> 1. Disabled consumer caching.  This increased the total job time from ~1
> minute per batch to ~1.8 minutes per batch.  This performance penalty is
> unacceptable for our use-case. We also saw some partitions stop receiving
> for an extended period of time - I was unable to get a simple repro for this
> effect though.
> 2. Disabled speculation and multiple-job concurrency and added caching for
> the stream directly after reading from Kafka & caching offsets.  This
> approach seems to work well for simple examples (read from a Kafka topic,
> write to another topic). However, when we move through more complex logic we
> continue to see this type of error - despite only creating the stream for a
> given topic a single time.  We validated that we're creating the stream from
> a given topic / partition a single time by logging on stream creation,
> caching the stream and (eventually) calling 'runJob' to actually go and
> fetch the data. Nonetheless with multiple outputs we see the
> ConcurrentModificationException.
>
> I've included some code down below.  I would be happy if anyone had
> debugging tips for the workaround.  However, my main concern is to ensure
> that the 2.4 version will have a bug fix that will work for Spark Streaming
> in which multiple input topics map data to multiple outputs. I would also
> like to understand if the fix (https://github.com/apache/spark/pull/20997)
> will be backported to Spark 2.3.x
>
> In our code, read looks like the following:
>
> case class StreamLookupKey(topic: Set[String], brokers: String)
>
> private var streamMap: Map[StreamLookupKey, DStream[DecodedData]] = Map()
>
> // Given inputs return a direct stream.
> def createDirectStream(ssc: StreamingContext,
>additionalKafkaParameters: Map[String, String],
>brokersToUse: Array[String], //
> broker1,broker2|broker3,broker4
>topicsToUse: Array[String],
>applicationName: String,
>persist: Option[PersistenceManager],
>useOldestOffsets: Boolean,
>maxRatePerPartition: Long,
>batchSeconds: Int
>   ): DStream[DecodedData] = {
>   val streams: Array[DStream[DecodedData]] =
> brokersToUse.map(brokers => {
>   val groupId = s"${applicationName}~${topicsToUse.mkString("~")}"
>   val kafkaParameters: Map[String, String] = getKafkaParameters(brokers,
> useOldestOffsets, groupId) ++ additionalKafkaParameters
>   logger.info(s"Kafka Params: ${kafkaParameters}")
>   val topics = topicsToUse.toSet
>   logger.info(s"Creating Kafka direct connection -
> ${kafkaParameters.mkString(GeneralConstants.comma)} " +
> s"topics: ${topics.mkString(GeneralConstants.comma)} w/
> applicationGroup: ${groupId}")
>
>   streamMap.getOrElse(StreamLookupKey(topics, brokers),
> createKafkaStream(ssc, applicationName, topics, brokers,
> maxRatePerPartition, batchSeconds, kafkaParameters))
> })
>
>   ssc.union(streams)
> }
>
> private def createKafkaStream(ssc: StreamingContext, applicationName:
> String, topics: Set[String], brokers: String,
>   maxRatePerPartition: Long, batchSeconds: Int,
> kafkaParameters: Map[String,String]): DStream[DecodedData] = {
>   logger.info(s"Creating a stream from Kafka for application
> ${applicationName} w/ topic ${topics} and " +
> s"brokers: ${brokers.split(',').head} with parameters:
> ${kafkaParameters.mkString("|")}")
>   try {
> val consumerStrategy = ConsumerStrategies.Subscribe[String,
> DecodedData](topics.toSeq, kafkaParameters)
> val stream: InputDStream[ConsumerRecord[String, DecodedData]] =
>   KafkaUtils.createDirectStream(ssc, locationStrategy =
> LocationStrategies.PreferBrokers, consumerStrategy = consumerStrategy)
>
> KafkaStreamFactory.writeStreamOffsets(applicationName, brokers, stream,
> maxRatePerPartition, batchSeconds)
> val result =
> 

Re: Spark Streaming - Kafka. java.lang.IllegalStateException: This consumer has already been closed.

2018-08-29 Thread Cody Koeninger
Are you able to try a recent version of spark?

On Wed, Aug 29, 2018 at 2:10 AM, Guillermo Ortiz Fernández
 wrote:
> I'm using Spark Streaming 2.0.1 with Kafka 0.10, sometimes I get this
> exception and Spark dies.
>
> I couldn't see any error or problem among the machines, anybody has the
> reason about this error?
>
>
> java.lang.IllegalStateException: This consumer has already been closed.
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1787)
> ~[kafka-clients-1.0.0.jar:na]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1091)
> ~[kafka-clients-1.0.0.jar:na]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:169)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:188)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:215)
> ~[spark-streaming-kafka-0-10_2.11-2.0.2.jar:2.0.2]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.Option.orElse(Option.scala:289) ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> ~[scala-library-2.11.11.jar:na]
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> ~[scala-library-2.11.11.jar:na]
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> ~[scala-library-2.11.11.jar:na]
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
> ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:249)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:247)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at scala.util.Try$.apply(Try.scala:192) ~[scala-library-2.11.11.jar:na]
> at
> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:89)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
> ~[spark-streaming_2.11-2.0.2.15.jar:2.0.2.15]
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Cody Koeninger
Offsets are loaded when you instantiate an
org.apache.kafka.clients.consumer.KafkaConsumer, subscribe, and poll.
There's not an explicit api for it.  Have you looked at the output of
kafka-consumer-groups.sh and tried the example code I linked to?


bash-3.2$ ./bin/kafka-consumer-groups.sh --bootstrap-server
localhost:9092 --group commitexample --describe
Note: This will only show information about consumers that use the
Java consumer API (non-ZooKeeper-based consumers).
Consumer group 'commitexample' has no active members.
TOPIC  PARTITION  CURRENT-OFFSET
LOG-END-OFFSET  LAGCONSUMER-ID
  HOST   CLIENT-ID
test   0  10561656
   600- -


scala> val c = new KafkaConsumer[String, String](kafkaParams.asJava)
c: org.apache.kafka.clients.consumer.KafkaConsumer[String,String] =
org.apache.kafka.clients.consumer.KafkaConsumer@780cbdf8
scala> c.subscribe(java.util.Arrays.asList("test"))
scala> c.poll(0)
scala> c.position(new TopicPartition("test", 0))
res4: Long = 1056






On Thu, Jun 14, 2018 at 3:33 PM, Bryan Jeffrey  wrote:
> Cody,
>
> Where is that called in the driver? The only call I see from Subscribe is to
> load the offset from checkpoint.
>
> Get Outlook for Android
>
> 
> From: Cody Koeninger 
> Sent: Thursday, June 14, 2018 4:24:58 PM
>
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The code that loads offsets from kafka is in e.g.
> org.apache.kafka.clients.consumer, it's not in spark.
>
> On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey 
> wrote:
>> Cody,
>>
>> Can you point me to the code that loads offsets? As far as I can see with
>> Spark 2.1, the only offset load is from checkpoint.
>>
>> Thank you!
>>
>> Bryan
>>
>> Get Outlook for Android
>>
>> 
>> From: Cody Koeninger 
>> Sent: Thursday, June 14, 2018 4:00:31 PM
>> To: Bryan Jeffrey
>> Cc: user
>> Subject: Re: Kafka Offset Storage: Fetching Offsets
>>
>> The expectation is that you shouldn't have to manually load offsets
>> from kafka, because the underlying kafka consumer on the driver will
>> start at the offsets associated with the given group id.
>>
>> That's the behavior I see with this example:
>>
>>
>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>>
>> What does bin/kafka-consumer-groups.sh show for your group id?
>>
>> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey 
>> wrote:
>>> Hello.
>>>
>>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>>> on
>>> the documentation
>>>
>>>
>>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>>> it appears that you can now use Kafka itself to store offsets.
>>>
>>> I've setup a  simple Kafka DStream:
>>> val kafkaParameters = Map[String, String](
>>>   "metadata.broker.list" -> brokers,
>>>   "auto.offset.reset" -> "latest",
>>>   "enable.auto.commit" -> false.toString,
>>>   "key.deserializer" ->
>>>
>>>
>>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>>   "partition.assignment.strategy" ->
>>>
>>>
>>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>>   "group.id" -> applicationName
>>> )
>>>
>>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>>> DecodedData](topics.toSeq, kafkaParameters)
>>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>>
>>>
>>> I then commit the offsets:
>>>
>>> var offsets: Array[OffsetRange] = Array()
>>> stream.foreachRDD(rdd => {
>>>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>   logger.info(s"Offsets: ${offsets.mkString("|")}")
>>> })
>>>
>>> // Future: Move this after we've done processing.
>>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>>>
>>> The offsets appear to commit successfully. However, on restart the
>>> streaming
>>> application consistently starts from latest whenever the Spark checkpoint
>>> is
>>> changed.  Drilling into the code it does not appear that re-loading
>>> offset
>>> data is supported in the Spark Streaming Kafka library.  How is this
>>> expected to work?  Is there an example of saving the offsets to Kafka and
>>> then loading them from Kafka?
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Cody Koeninger
The code that loads offsets from kafka is in e.g.
org.apache.kafka.clients.consumer, it's not in spark.

On Thu, Jun 14, 2018 at 3:22 PM, Bryan Jeffrey  wrote:
> Cody,
>
> Can you point me to the code that loads offsets? As far as I can see with
> Spark 2.1, the only offset load is from checkpoint.
>
> Thank you!
>
> Bryan
>
> Get Outlook for Android
>
> ____
> From: Cody Koeninger 
> Sent: Thursday, June 14, 2018 4:00:31 PM
> To: Bryan Jeffrey
> Cc: user
> Subject: Re: Kafka Offset Storage: Fetching Offsets
>
> The expectation is that you shouldn't have to manually load offsets
> from kafka, because the underlying kafka consumer on the driver will
> start at the offsets associated with the given group id.
>
> That's the behavior I see with this example:
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala
>
> What does bin/kafka-consumer-groups.sh show for your group id?
>
> On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey 
> wrote:
>> Hello.
>>
>> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based
>> on
>> the documentation
>>
>> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
>> it appears that you can now use Kafka itself to store offsets.
>>
>> I've setup a  simple Kafka DStream:
>> val kafkaParameters = Map[String, String](
>>   "metadata.broker.list" -> brokers,
>>   "auto.offset.reset" -> "latest",
>>   "enable.auto.commit" -> false.toString,
>>   "key.deserializer" ->
>>
>> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>>   "partition.assignment.strategy" ->
>>
>> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>>   "bootstrap.servers" -> brokersToUse.mkString(","),
>>   "group.id" -> applicationName
>> )
>>
>> val consumerStrategy = ConsumerStrategies.Subscribe[String,
>> DecodedData](topics.toSeq, kafkaParameters)
>> KafkaUtils.createDirectStream(ssc, locationStrategy =
>> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>>
>>
>> I then commit the offsets:
>>
>> var offsets: Array[OffsetRange] = Array()
>> stream.foreachRDD(rdd => {
>>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>   logger.info(s"Offsets: ${offsets.mkString("|")}")
>> })
>>
>> // Future: Move this after we've done processing.
>> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>>
>> The offsets appear to commit successfully. However, on restart the
>> streaming
>> application consistently starts from latest whenever the Spark checkpoint
>> is
>> changed.  Drilling into the code it does not appear that re-loading offset
>> data is supported in the Spark Streaming Kafka library.  How is this
>> expected to work?  Is there an example of saving the offsets to Kafka and
>> then loading them from Kafka?
>>
>> Regards,
>>
>> Bryan Jeffrey

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Offset Storage: Fetching Offsets

2018-06-14 Thread Cody Koeninger
The expectation is that you shouldn't have to manually load offsets
from kafka, because the underlying kafka consumer on the driver will
start at the offsets associated with the given group id.

That's the behavior I see with this example:

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/CommitAsync.scala

What does bin/kafka-consumer-groups.sh show for your group id?

On Thu, Jun 14, 2018 at 10:25 AM, Bryan Jeffrey  wrote:
> Hello.
>
> I am using Spark 2.1 and Kafka 0.10.2.1 and the DStream interface.  Based on
> the documentation
> (https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself),
> it appears that you can now use Kafka itself to store offsets.
>
> I've setup a  simple Kafka DStream:
> val kafkaParameters = Map[String, String](
>   "metadata.broker.list" -> brokers,
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> false.toString,
>   "key.deserializer" ->
> classOf[org.apache.kafka.common.serialization.StringDeserializer].getCanonicalName,
>   "value.deserializer" -> classOf[MyDecoder].getCanonicalName,
>   "partition.assignment.strategy" ->
> classOf[org.apache.kafka.clients.consumer.RoundRobinAssignor].getCanonicalName,
>   "bootstrap.servers" -> brokersToUse.mkString(","),
>   "group.id" -> applicationName
> )
>
> val consumerStrategy = ConsumerStrategies.Subscribe[String,
> DecodedData](topics.toSeq, kafkaParameters)
> KafkaUtils.createDirectStream(ssc, locationStrategy =
> LocationStrategies.PreferConsistent, consumerStrategy = consumerStrategy)
>
>
> I then commit the offsets:
>
> var offsets: Array[OffsetRange] = Array()
> stream.foreachRDD(rdd => {
>   offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   logger.info(s"Offsets: ${offsets.mkString("|")}")
> })
>
> // Future: Move this after we've done processing.
> stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>
> The offsets appear to commit successfully. However, on restart the streaming
> application consistently starts from latest whenever the Spark checkpoint is
> changed.  Drilling into the code it does not appear that re-loading offset
> data is supported in the Spark Streaming Kafka library.  How is this
> expected to work?  Is there an example of saving the offsets to Kafka and
> then loading them from Kafka?
>
> Regards,
>
> Bryan Jeffrey

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Structured-Streaming][Beginner] Out of order messages with Spark kafka readstream from a specific partition

2018-05-10 Thread Cody Koeninger
As long as you aren't doing any spark operations that involve a
shuffle, the order you see in spark should be the same as the order in
the partition.

Can you link to a minimal code example that reproduces the issue?

On Wed, May 9, 2018 at 7:05 PM, karthikjay  wrote:
> On the producer side, I make sure data for a specific user lands on the same
> partition. On the consumer side, I use a regular Spark kafka readstream and
> read the data. I also use a console write stream to print out the spark
> kafka DataFrame. What I observer is, the data for a specific user (even
> though in the same partition) arrives out of order in the console.
>
> I also verified the data ordering by running a simple Kafka consumer in Java
> and the data seems to be ordered. What am I missing here ?
>
> Thanks,
> JK
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Structured streaming: Tried to fetch $offset but the returned record offset was ${record.offset}"

2018-04-17 Thread Cody Koeninger
Is this possibly related to the recent post on
https://issues.apache.org/jira/browse/SPARK-18057 ?

On Mon, Apr 16, 2018 at 11:57 AM, ARAVIND SETHURATHNAM <
asethurath...@homeaway.com.invalid> wrote:

> Hi,
>
> We have several structured streaming jobs (spark version 2.2.0) consuming
> from kafka and writing to s3. They were running fine for a month, since
> yesterday few jobs started failing and I see the below exception in the
> failed jobs  log,
>
>
>
> ```Tried to fetch 473151075 but the returned record offset was 473151072```
> ```GScheduler: ResultStage 0 (start at SparkStreamingTask.java:222) failed
> in 77.546 s due to Job aborted due to stage failure: Task 86 in stage 0.0
> failed 4 times, most recent failure: Lost task 86.3 in stage 0.0 (TID 96,
> ip-10-120-12-52.ec2.internal, executor 11): java.lang.IllegalStateException:
> Tried to fetch 473151075 but the returned record offset was 473151072
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.fetchData(
> CachedKafkaConsumer.scala:234)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(
> CachedKafkaConsumer.scala:106)
> at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> getNext(KafkaSourceRDD.scala:158)
> at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> getNext(KafkaSourceRDD.scala:149)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$
> GeneratedIterator.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.
> hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$
> anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.convert.Wrappers$IteratorWrapper.
> hasNext(Wrappers.scala:30)
> `
>
>
>
> can someone provide some direction what could be causing this all of a
> sudden when consuming from those topics?
>
>
>
> regards
>
> [image: https://ssl.gstatic.com/ui/v1/icons/mail/images/cleardot.gif]
>
> Aravind
>
>
>


Re: is it possible to use Spark 2.3.0 along with Kafka 0.9.0.1?

2018-03-16 Thread Cody Koeninger
Should be able to use the 0.8 kafka dstreams with a kafka 0.9 broker

On Fri, Mar 16, 2018 at 7:52 AM, kant kodali  wrote:
> Hi All,
>
> is it possible to use Spark 2.3.0 along with Kafka 0.9.0.1?
>
> Thanks,
> kant

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: KafkaUtils.createStream(..) is removed for API

2018-02-19 Thread Cody Koeninger
I can't speak for committers, but my guess is it's more likely for
DStreams in general to stop being supported before that particular
integration is removed.

On Sun, Feb 18, 2018 at 9:34 PM, naresh Goud  wrote:
> Thanks Ted.
>
> I see  createDirectStream is experimental as annotated with
> "org.apache.spark.annotation.Experimental".
>
> Is it possible to be this API will be removed in future?  because we wanted
> to use this API in one of our production jobs. afraid if it will not be
> supported in future.
>
> Thank you,
> Naresh
>
>
>
>
> On Sun, Feb 18, 2018 at 7:47 PM, Ted Yu  wrote:
>>
>> createStream() is still in
>> external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
>> But it is not in
>> external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala
>>
>> FYI
>>
>> On Sun, Feb 18, 2018 at 5:17 PM, naresh Goud 
>> wrote:
>>>
>>> Hello Team,
>>>
>>> I see "KafkaUtils.createStream() " method not available in spark 2.2.1.
>>>
>>> Can someone please confirm if these methods are removed?
>>>
>>> below is my pom.xml entries.
>>>
>>>
>>> 
>>>   2.11.8
>>>   2.11
>>> 
>>>
>>>
>>>   
>>>   org.apache.spark
>>>   spark-streaming_${scala.tools.version}
>>>   2.2.1
>>>   provided
>>>   
>>> 
>>>   org.apache.spark
>>>   spark-streaming-kafka-0-10_2.11
>>>   2.2.1
>>>   provided
>>> 
>>> 
>>>   org.apache.spark
>>>   spark-core_2.11
>>>   2.2.1
>>>   provided
>>> 
>>>   
>>>
>>>
>>>
>>>
>>>
>>> Thank you,
>>> Naresh
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Providing Kafka configuration as Map of Strings

2018-01-24 Thread Cody Koeninger
Have you tried passing in a Map that happens to have
string for all the values?  I haven't tested this, but the underlying
kafka consumer constructor is documented to take either strings or
objects as values, despite the static type.

On Wed, Jan 24, 2018 at 2:48 PM, Tecno Brain
 wrote:
> Basically, I am trying to avoid writing code like:
>
>   switch( key ) {
> case "key.deserializer" :  result.put(key ,
> Class.forName(value)); break;
> case "key.serializer"   :  result.put(key ,
> Class.forName(value)); break;
> case "value.deserializer" :  result.put(key ,
> Class.forName(value)); break;
> case "value.serializer"   :  result.put(key ,
> Class.forName(value)); break;
> case "max.partition.fetch.bytes" : result.put(key,
> Long.valueOf(value)); break;
> case "max.poll.interval.ms" : result.put(key,
> Long.valueOf(value)); break;
> case "enable.auto.commit" : result.put(key,
> Boolean.valueOf(value)); break;
> default:
> result.put(key, value);
> break;
> }
>
> since I would need to go over all possible Kafka properties that are not
> expected as a String.
>
> On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain 
> wrote:
>>
>> On page
>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>> there is this Java example:
>>
>> Map kafkaParams = new HashMap<>();
>> kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
>> kafkaParams.put("key.deserializer", StringDeserializer.class);
>> kafkaParams.put("value.deserializer", StringDeserializer.class);
>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>> kafkaParams.put("auto.offset.reset", "latest");
>> kafkaParams.put("enable.auto.commit", false);
>>
>> Collection topics = Arrays.asList("topicA", "topicB");
>>
>> JavaInputDStream> stream =
>>   KafkaUtils.createDirectStream(
>> streamingContext,
>> LocationStrategies.PreferConsistent(),
>> ConsumerStrategies.Subscribe(topics, kafkaParams)
>>   );
>>
>> I would like to configure Kafka from properties loaded from a Properties
>> file or a Map.
>>
>> Is there any API to take a Map and produce the required
>> Map required to set the Kafka parameters ? Such code would
>> convert "true" to a boolean, or a class name to the Class depending on the
>> key.
>>
>> Seems to me that I would need to know ALL possible Kafka parameters and
>> what data type they should be converted to in order to produce the
>> Map kafkaParams.
>>
>> The older API used a Map passed to the
>> KafkaUtils.createDirectStream
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: uncontinuous offset in kafka will cause the spark streamingfailure

2018-01-24 Thread Cody Koeninger
When you say the patch is not suitable, can you clarify why?

Probably best to get the various findings centralized on
https://issues.apache.org/jira/browse/SPARK-17147

Happy to help with getting the patch up to date and working.

On Wed, Jan 24, 2018 at 1:19 AM, namesuperwood <namesuperw...@gmail.com> wrote:
> It seems this patch is not suitable for our problem。
>
> https://github.com/apache/spark/compare/master...koeninger:SPARK-17147
>
> wood.super
>
>  原始邮件
> 发件人: namesuperwood<namesuperw...@gmail.com>
> 收件人: Justin Miller<justin.mil...@protectwise.com>
> 抄送: user<user@spark.apache.org>; Cody Koeninger<c...@koeninger.org>
> 发送时间: 2018年1月24日(周三) 14:45
> 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure
>
> Yes. My spark streaming application works with uncompacted topic. I will
> check the patch.
>
>
> wood.super
>
>  原始邮件
> 发件人: Justin Miller<justin.mil...@protectwise.com>
> 收件人: namesuperwood<namesuperw...@gmail.com>
> 抄送: user<user@spark.apache.org>; Cody Koeninger<c...@koeninger.org>
> 发送时间: 2018年1月24日(周三) 14:23
> 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure
>
> We appear to be kindred spirits, I’ve recently run into the same issue. Are
> you running compacted topics? I’ve run into this issue on non-compacted
> topics as well, it happens rarely but is still a pain. You might check out
> this patch and related spark streaming Kafka ticket:
>
> https://github.com/apache/spark/compare/master...koeninger:SPARK-17147
> https://issues.apache.org/jira/browse/SPARK-17147
>
> I’ll be testing out the patch on somewhat large scale stream processor
> tomorrow.
>
> CCing: Cody Koeninger
>
> Best,
> Justin
>
> On Jan 23, 2018, at 10:48 PM, namesuperwood <namesuperw...@gmail.com> wrote:
>
> Hi all
>
> kafka version :  kafka_2.11-0.11.0.2
>spark version :  2.0.1
>
> A topic-partition "adn-tracking,15"  in kafka  who's   earliest offset is
> 1255644602 and  latest offset is 1271253441.
>
> While starting a spark streaming to process the data from the topic ,  we
> got a exception with "Got wrong record   even after seeking to offset
> 1266921577”.  [   (earliest offset) 1255644602 < 1266921577   <
> 1271253441 ( latest offset ) ]
>
> Finally, I found the following source code in class CachedKafkaCounsumer
> from spark-streaming. This is obviously due to the fact that the offset from
> consumer poll and the offset which the comsuner seek is not equal.
>
>
> Here is the “ CachedKafkaCounsumer.scala” code:
>
> def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
>
> logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested
> $offset") if (offset != nextOffset) {
>
> logInfo(s"Initial fetch for $groupId $topic $partition $offset")
> seek(offset) poll(timeout) }
>
> if (!buffer.hasNext()) { poll(timeout) }
> assert(buffer.hasNext(),
>   s"Failed to get records for $groupId $topic $partition $offset after
> polling for $timeout")
> var record = buffer.next()
>
> if (record.offset != offset) {
>   logInfo(s"Buffer miss for $groupId $topic $partition $offset")
>   seek(offset)
>   poll(timeout)
>   assert(buffer.hasNext(),
> s"Failed to get records for $groupId $topic $partition $offset after
> polling for $timeout")
>   record = buffer.next()
>   assert(record.offset == offset,
> s"Got wrong record for $groupId $topic $partition even after seeking to
> offset $offset")
> }
>
> nextOffset = offset + 1
> record
>
> }
>
> I reproduce this problem, and found out that offset from one
> topicAndPartition is uncontinuous in Kafka。I think this is a bug that needs
> to be repaired.
>
> I  implemented a simple project to use consumer to  seek offset 1266921577.
> But it return the offset 1266921578. Then while  seek to 1266921576, it
> return the 1266921576 exactly。
>
>
>
>
>
> There is the code:
>
> public class consumerDemo {
>
> public static void main(String[] argv){
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "172.31.29.31:9091");
> props.put("group.id", "consumer-tutorial-demo");
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("value.deserializer", StringDeserializer.class.getName());
> KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
> String>(props);
> TopicPartition tp = new TopicPar

Re: "Got wrong record after seeking to offset" issue

2018-01-18 Thread Cody Koeninger
https://kafka.apache.org/documentation/#compaction

On Thu, Jan 18, 2018 at 1:17 AM, Justin Miller
<justin.mil...@protectwise.com> wrote:
> By compacted do you mean compression? If so then we did recently turn on lz4
> compression. If there’s another meaning if there’s a command I can run to
> check compaction I’m happy to give that a shot too.
>
> I’ll try consuming from the failed offset if/when the problem manifests
> itself again.
>
> Thanks!
> Justin
>
>
> On Wednesday, January 17, 2018, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> That means the consumer on the executor tried to seek to the specified
>> offset, but the message that was returned did not have a matching
>> offset.  If the executor can't get the messages the driver told it to
>> get, something's generally wrong.
>>
>> What happens when you try to consume the particular failing offset
>> from another  (e.g. commandline) consumer?
>>
>> Is the topic in question compacted?
>>
>>
>>
>> On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller
>> <justin.mil...@protectwise.com> wrote:
>> > Greetings all,
>> >
>> > I’ve recently started hitting on the following error in Spark Streaming
>> > in Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms
>> > even to five minutes doesn’t seem to be helping. The problem only 
>> > manifested
>> > in the last few days, restarting with a new consumer group seems to remedy
>> > the issue for a few hours (< retention, which is 12 hours).
>> >
>> > Error:
>> > Caused by: java.lang.AssertionError: assertion failed: Got wrong record
>> > for spark-executor-  76 even after seeking to
>> > offset 1759148155
>> > at scala.Predef$.assert(Predef.scala:170)
>> > at
>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
>> > at
>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
>> > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>> >
>> > I guess my questions are, why is that assertion a job killer vs a
>> > warning and is there anything I can tweak settings wise that may keep it at
>> > bay.
>> >
>> > I wouldn’t be surprised if this issue were exacerbated by the volume we
>> > do on Kafka topics (~150k/sec on the persister that’s crashing).
>> >
>> > Thank you!
>> > Justin
>> >
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: "Got wrong record after seeking to offset" issue

2018-01-17 Thread Cody Koeninger
That means the consumer on the executor tried to seek to the specified
offset, but the message that was returned did not have a matching
offset.  If the executor can't get the messages the driver told it to
get, something's generally wrong.

What happens when you try to consume the particular failing offset
from another  (e.g. commandline) consumer?

Is the topic in question compacted?



On Tue, Jan 16, 2018 at 11:10 PM, Justin Miller
 wrote:
> Greetings all,
>
> I’ve recently started hitting on the following error in Spark Streaming in 
> Kafka. Adjusting maxRetries and spark.streaming.kafka.consumer.poll.ms even 
> to five minutes doesn’t seem to be helping. The problem only manifested in 
> the last few days, restarting with a new consumer group seems to remedy the 
> issue for a few hours (< retention, which is 12 hours).
>
> Error:
> Caused by: java.lang.AssertionError: assertion failed: Got wrong record for 
> spark-executor-  76 even after seeking to 
> offset 1759148155
> at scala.Predef$.assert(Predef.scala:170)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:85)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>
> I guess my questions are, why is that assertion a job killer vs a warning and 
> is there anything I can tweak settings wise that may keep it at bay.
>
> I wouldn’t be surprised if this issue were exacerbated by the volume we do on 
> Kafka topics (~150k/sec on the persister that’s crashing).
>
> Thank you!
> Justin
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Which kafka client to use with spark streaming

2017-12-26 Thread Cody Koeninger
Do not add a dependency on kafka-clients, the spark-streaming-kafka
library has appropriate transitive dependencies.

Either version of the spark-streaming-kafka library should work with
1.0 brokers; what problems were you having?



On Mon, Dec 25, 2017 at 7:58 PM, Diogo Munaro Vieira
 wrote:
> Hey Serkan, it depends of your Kafka version... Is it 0.8.2?
>
> Em 25 de dez de 2017 06:17, "Serkan TAS"  escreveu:
>>
>> Hi,
>>
>>
>>
>> Working on spark 2.2.0 cluster and 1.0 kafka brokers.
>>
>>
>>
>> I was using the library
>>
>> "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.0"
>>
>>
>>
>> and had lots of problems during streaming process then downgraded to
>>
>>"org.apache.spark" % "spark-streaming-kafka-0-8_2.11" %
>> "2.2.0"
>>
>>
>>
>> And i know there is also another path which is using kafka-clients jars
>> which has the latest version of 1.0.0
>>
>>
>>
>> 
>>
>> 
>>
>> org.apache.kafka
>>
>> kafka-clients
>>
>> 1.0.0
>>
>> 
>>
>>
>>
>> I am confused which path  is the  right one
>>
>>
>>
>> Thanks…
>>
>>
>>
>>
>>
>>
>> 
>>
>> Bu ileti hukuken korunmuş, gizli veya ifşa edilmemesi gereken bilgiler
>> içerebilir. Şayet mesajın gönderildiği kişi değilseniz, bu iletiyi çoğaltmak
>> ve dağıtmak yasaktır. Bu mesajı yanlışlıkla alan kişi, bu durumu derhal
>> gönderene telefonla ya da e-posta ile bildirmeli ve bilgisayarından
>> silmelidir. Bu iletinin içeriğinden yalnızca iletiyi gönderen kişi
>> sorumludur.
>>
>> This communication may contain information that is legally privileged,
>> confidential or exempt from disclosure. If you are not the intended
>> recipient, please note that any dissemination, distribution, or copying of
>> this communication is strictly prohibited. Anyone who receives this message
>> in error should notify the sender immediately by telephone or by return
>> communication and delete it from his or her computer. Only the person who
>> has sent this message is responsible for its content.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Cody Koeninger
You can't create a network connection to kafka on the driver and then
serialize it to send it the executor.  That's likely why you're getting
serialization errors.

Kafka producers are thread safe and designed for use as a singleton.

Use a lazy singleton instance of the producer on the executor, don't pass
it in.

On Mon, Dec 18, 2017 at 9:20 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Couldn’t you readStream from Kafka, do your transformations, map your rows
> from the transformed input into what you want need to send to Kafka, then
> writeStream to Kafka?
>
>
>
>
>
> *From: *Liana Napalkova 
> *Date: *Monday, December 18, 2017 at 10:07 AM
> *To: *Silvio Fiorito , "
> user@spark.apache.org" 
>
> *Subject: *Re: How to properly execute `foreachPartition` in Spark 2.2
>
>
>
> I need to firstly read from Kafka queue into a DataFrame. Then I should
> perform some transformations with the data. Finally, for each row in the
> DataFrame I should conditionally apply KafkaProducer in order to send some
> data to Kafka.
>
> So, I am both consuming and producing the data from/to Kafka.
>
>
> --
>
> *From:* Silvio Fiorito 
> *Sent:* 18 December 2017 16:00:39
> *To:* Liana Napalkova; user@spark.apache.org
> *Subject:* Re: How to properly execute `foreachPartition` in Spark 2.2
>
>
>
> Why don’t you just use the Kafka sink for Spark 2.2?
>
>
>
> https://spark.apache.org/docs/2.2.0/structured-streaming-
> kafka-integration.html#creating-a-kafka-sink-for-streaming-queries
>
>
>
>
>
>
>
> *From: *Liana Napalkova 
> *Date: *Monday, December 18, 2017 at 9:45 AM
> *To: *"user@spark.apache.org" 
> *Subject: *How to properly execute `foreachPartition` in Spark 2.2
>
>
>
> Hi,
>
>
>
> I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I
> explain the problem is details. I appreciate any help.
>
>
>
> In Spark 1.6 I was doing something similar to this:
>
>
>
> DstreamFromKafka.foreachRDD(session => {
> session.foreachPartition { partitionOfRecords =>
>   println("Setting the producer.")
>   val producer = Utils.createProducer(mySet.
> value("metadataBrokerList"),
>
> mySet.value("batchSize"),
>
> mySet.value("lingerMS"))
>   partitionOfRecords.foreach(s => {
>
>  //...
>
>
>
> However, I cannot find the proper way to do the similar thing in Spark
> 2.2. I tried to write my own class by extending `ForeachWriter`, but I get
> Task Serialization error when passing `KafkaProducer`.
>
> *class *MyTestClass(
> // *val inputparams*: String)
>   *extends *Serializable
> {
>
>   *val **spark *= SparkSession
> .*builder*()
> .appName("TEST")
> //.config("spark.sql.warehouse.dir", kafkaData)
> .enableHiveSupport()
> .getOrCreate()
>
> *import **spark*.implicits._
>
> *val *df: Dataset[String] = *spark*.readStream
>  .format("kafka")
>  .option("kafka.bootstrap.servers", "localhost:9092")
>  .option("subscribe", "test")
>  .option("startingOffsets", "latest")
>  .option("failOnDataLoss", "true")
>  .load()
>  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
> String)] // Kafka sends bytes
>  .map(_._2)
>
> *val *producer = // create KafkaProducer
>
> *val *writer = *new *MyForeachWriter(producer: KafkaProducer[String,String])
>
> *val *query = df
>   .writeStream
>   .foreach(writer)
>   .start
>
> query.awaitTermination()
>
> *spark*.stop()
>
>
> *class *MyForeachWriter *extends *ForeachWriter[String] *with *Serializable {
>
>   *var **producer*: KafkaProducer[String,String] = _
>
>   *def this*(producer: KafkaProducer[String,String])
>   {
> *this*()
> *this*.*producer *= producer
>   }
>
>   *override def *process(row: String): Unit =
>   {
> // ...
>   }
>
>   *override def *close(errorOrNull: Throwable): Unit = {}
>
>   *override def *open(partitionId: Long, version: Long): Boolean = {
>
> *true  *}
>
> }
>
>
>
>
>
> *Liana Napalkova, PhD*
>
> *Big Data Analytics Unit*
> * -- *
>
>
>
>
>
> *T  +34 **93 238 14 00 (ext. 1248)*
> *M +34 **633 426 677*
>
> *liana.napalk...@eurecat.org *
> --
>
> Carrer Camí Antic de València 54
> -56,
> Edifici A - 08005 - Barcelona
> www.eurecat.org
>
> Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat
>
>
> --
> DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè
> no n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber
> immediatament a la següent adreça: le...@eurecat.org Si el destinatari
> d'aquest missatge no consent 

Re: bulk upsert data batch from Kafka dstream into Postgres db

2017-12-14 Thread Cody Koeninger
Modern versions of postgres have upsert, ie insert into ... on
conflict ... do update

On Thu, Dec 14, 2017 at 11:26 AM, salemi  wrote:
> Thank you for your respond.
> The approach loads just the data into the DB. I am looking for an approach
> that allows me to update  existing entries in the DB amor insert a new entry
> if it doesn't exist.
>
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: bulk upsert data batch from Kafka dstream into Postgres db

2017-12-14 Thread Cody Koeninger
use foreachPartition(), get a connection from a jdbc connection pool,
and insert the data the same way you would in a non-spark program.

If you're only doing inserts, postgres COPY will be faster (e.g.
https://discuss.pivotal.io/hc/en-us/articles/204237003), but if you're
doing updates that's not an option.

Depending on how many spark partitions you have, coalesce() to
decrease the number of partitions may help avoid database contention
and speed things up, but you'll need to experiment.

On Wed, Dec 13, 2017 at 11:52 PM, salemi  wrote:
> Hi All,
>
> we are consuming messages from Kafka using Spark dsteam. Once the processing
> is done we would like to update/insert the data in bulk fashion into the
> database.
>
> I was wondering what the best solution for this might be. Our Postgres
> database table is not partitioned.
>
>
> Thank you,
>
> Ali
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark streaming] No assigned partition error during seek

2017-12-01 Thread Cody Koeninger
Yeah, don't mix multiple versions of kafka clients.  That's not 100%
certain to be the cause of your problem, but it can't be helping.

As for your comments about async commits, read

https://issues.apache.org/jira/browse/SPARK-22486

and if you think your use case is still relevant to others given those
constraints, then share it.

On Fri, Dec 1, 2017 at 4:11 AM, Qiao, Richard
<richard.q...@capitalone.com> wrote:
> In your case, it looks it’s trying to make 2 versions Kafka existed in the
> same JVM at runtime. There is version conflict.
>
>
>
> About “I dont find the spark async commit  useful for our needs”, do you
> mean to say the code like below?
>
> kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
>
>
>
>
>
> Best Regards
>
> Richard
>
>
>
>
>
> From: venkat <meven...@gmail.com>
> Date: Thursday, November 30, 2017 at 8:16 PM
> To: Cody Koeninger <c...@koeninger.org>
> Cc: "user@spark.apache.org" <user@spark.apache.org>
> Subject: Re: [Spark streaming] No assigned partition error during seek
>
>
>
> I notice that 'Do not manually add dependencies on org.apache.kafka
> artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has
> the appropriate transitive dependencies already, and different versions may
> be incompatible in hard to diagnose way' after your query.
>
> Does this imply that we should not be adding kafka clients in our jars?.
>
> Thanks
>
> Venkat
>
>
>
> On Fri, 1 Dec 2017 at 06:45 venkat <meven...@gmail.com> wrote:
>
> Yes I use latest Kafka clients 0.11 to determine beginning offsets without
> seek and also I use Kafka offsets commits externally.
>
> I dont find the spark async commit  useful for our needs.
>
> Thanks
>
> Venkat
>
>
>
> On Fri, 1 Dec 2017 at 02:39 Cody Koeninger <c...@koeninger.org> wrote:
>
> You mentioned 0.11 version; the latest version of org.apache.kafka
> kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
> has an appropriate dependency.
>
> Are you manually depending on a different version of the kafka-clients
> artifact?
>
> On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <meven...@gmail.com> wrote:
>> Version: 2.2 with Kafka010
>>
>> Hi,
>>
>> We are running spark streaming on AWS and trying to process incoming
>> messages on Kafka topics. All was well.
>> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library
>> and
>> Kafka 0.11 version of server.
>>
>> With this new version of software we are facing issues with regard to 'No
>> assignment to partition for a topic and it happens intermittently'. I
>> construct four DStreams with different group.ids as suggested.
>>
>> The main source of code thats causing the issue is this one
>>
>> if (!toSeek.isEmpty) {
>>   // work around KAFKA-3370 when reset is none
>>   // poll will throw if no position, i.e. auto offset reset none and
>> no
>> explicit position
>>   // but cant seek to a position before poll, because poll is what
>> gets
>> subscription partitions
>>   // So, poll, suppress the first exception, then seek
>>   val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>>   val shouldSuppress = aor != null &&
>> aor.asInstanceOf[String].toUpperCase == "NONE"
>>   try {
>> consumer.poll(0)
>>   } catch {
>> case x: NoOffsetForPartitionException if shouldSuppress =>
>>   logWarning("Catching NoOffsetForPartitionException since " +
>> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
>> KAFKA-3370")
>>   }
>>   toSeek.asScala.foreach { case (topicPartition, offset) =>
>>   *consumer.seek(topicPartition, offset)*
>>   }
>> }
>>
>> At the start of the job, I also ensure we are supplying all required
>> offsets
>> correctly
>>
>> private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
>> Map<TopicPartition, Long> offsets = new HashMap<>();
>> List topicPartitions =
>> consumer.partitionsFor(topic).stream().map(partitionInfo ->
>> new TopicPartition(partitionInfo.topic(),
>> partitionInfo.partition()))
>> .collect(Collectors.toList());
>> Map<TopicPartition, Long> earliestOffsets =
>> consumer.beginningOffsets(topicPartitions);
>> // pick committed offsets
>> for (TopicPartition topicAndPartition : topicPartitions) {
&

Re: Kafka version support

2017-11-30 Thread Cody Koeninger
Are you talking about the broker version, or the kafka-clients artifact version?

On Thu, Nov 30, 2017 at 12:17 AM, Raghavendra Pandey
 wrote:
> Just wondering if anyone has tried spark structured streaming kafka
> connector (2.2) with Kafka 0.11 or Kafka 1.0 version
>
> Thanks
> Raghav

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark streaming] No assigned partition error during seek

2017-11-30 Thread Cody Koeninger
You mentioned 0.11 version; the latest version of org.apache.kafka
kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
has an appropriate dependency.

Are you manually depending on a different version of the kafka-clients artifact?

On Fri, Nov 24, 2017 at 7:39 PM, venks61176  wrote:
> Version: 2.2 with Kafka010
>
> Hi,
>
> We are running spark streaming on AWS and trying to process incoming
> messages on Kafka topics. All was well.
> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and
> Kafka 0.11 version of server.
>
> With this new version of software we are facing issues with regard to 'No
> assignment to partition for a topic and it happens intermittently'. I
> construct four DStreams with different group.ids as suggested.
>
> The main source of code thats causing the issue is this one
>
> if (!toSeek.isEmpty) {
>   // work around KAFKA-3370 when reset is none
>   // poll will throw if no position, i.e. auto offset reset none and no
> explicit position
>   // but cant seek to a position before poll, because poll is what gets
> subscription partitions
>   // So, poll, suppress the first exception, then seek
>   val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>   val shouldSuppress = aor != null &&
> aor.asInstanceOf[String].toUpperCase == "NONE"
>   try {
> consumer.poll(0)
>   } catch {
> case x: NoOffsetForPartitionException if shouldSuppress =>
>   logWarning("Catching NoOffsetForPartitionException since " +
> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
> KAFKA-3370")
>   }
>   toSeek.asScala.foreach { case (topicPartition, offset) =>
>   *consumer.seek(topicPartition, offset)*
>   }
> }
>
> At the start of the job, I also ensure we are supplying all required offsets
> correctly
>
> private Map getCommittedOffsets(String topic) {
> Map offsets = new HashMap<>();
> List topicPartitions =
> consumer.partitionsFor(topic).stream().map(partitionInfo ->
> new TopicPartition(partitionInfo.topic(),
> partitionInfo.partition()))
> .collect(Collectors.toList());
> Map earliestOffsets =
> consumer.beginningOffsets(topicPartitions);
> // pick committed offsets
> for (TopicPartition topicAndPartition : topicPartitions) {
>   final OffsetAndMetadata committed =
> consumer.committed(topicAndPartition);
>   Long earliestOffset = earliestOffsets.get(topicAndPartition);
>   if (committed != null && committed.offset() > earliestOffset) {
> logger
> .warn(
> "Committed offset found for: {} offset:{} -> Hence adding
> committed offset",
> topicAndPartition, committed.offset());
> offsets.put(topicAndPartition, committed.offset());
>   } else {
> logger
> .warn(
> "New partition/stale offset found for: {} offset:{} -> Hence
> adding earliest offset",
> topicAndPartition, earliestOffset);
> offsets.put(topicAndPartition, earliestOffset);
>   }
> }
> return offsets;
>   }
>
> The actual stack trace:
>
> Caused by: java.lang.IllegalStateException: No current assignment for
> partition genericEvents-1
> 2017-11-23 10:35:24,677 -at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
> 2017-11-23 10:35:24,677 -at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
> 2017-11-23 10:35:24,677 -at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
> 2017-11-23 10:35:24,678 -at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
> 2017-11-23 10:35:24,678 -at
> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,678 -at
> scala.collection.Iterator$class.foreach(Iterator.scala:893)
> 2017-11-23 10:35:24,678 -at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> 2017-11-23 10:35:24,678 -at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 2017-11-23 10:35:24,678 -at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 2017-11-23 10:35:24,678 -at
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
> 2017-11-23 10:35:24,679 -at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
> 2017-11-23 10:35:24,679 -at
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
> 2017-11-23 10:35:24,679 -at
> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
> 2017-11-23 10:35:24,679 -at
> 

Re: Spark Streaming fails with unable to get records after polling for 512 ms

2017-11-17 Thread Cody Koeninger
I don't see anything obvious, you'd need to do more troubleshooting.

Could also try creating a single rdd for a known range of offsets:

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-an-rdd

On Wed, Nov 15, 2017 at 9:33 PM, jagadish kagitala <jka...@gmail.com> wrote:
> Hi Cody,
>
> It worked, after moving the parameter to sparkConf. I don't see that error.
> But, Now i'm seeing the count for each RDD returns 0. But, there are records
> in the topic i'm reading.
>
> Do you see anything wrong with how i'm creating the Direct Stream ?
>
> Thanks
> Jagadish
>
> On Wed, Nov 15, 2017 at 11:23 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> spark.streaming.kafka.consumer.poll.ms  is a spark configuration, not
>> a kafka parameter.
>>
>> see http://spark.apache.org/docs/latest/configuration.html
>>
>> On Tue, Nov 14, 2017 at 8:56 PM, jkagitala <jka...@gmail.com> wrote:
>> > Hi,
>> >
>> > I'm trying to add spark-streaming to our kafka topic. But, I keep
>> > getting
>> > this error
>> > java.lang.AssertionError: assertion failed: Failed to get record after
>> > polling for 512 ms.
>> >
>> > I tried to add different params like max.poll.interval.ms,
>> > spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams.
>> > But, i still get failed to get records after 512ms. Not sure, even
>> > adding
>> > the above params doesn't change the polling time.
>> >
>> > Without spark-streaming, i'm able to fetch the records. Only with
>> > spark-streaming addon, i get this error.
>> >
>> > Any help is greatly appreciated. Below, is the code i'm using.
>> >
>> > SparkConf sparkConf = new
>> >
>> > SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
>> > JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>> > Durations.seconds(10));
>> >
>> > kafkaParams.put("bootstrap.servers", hosts);
>> > kafkaParams.put("group.id", groupid);
>> > kafkaParams.put("auto.commit.enable", false);
>> > kafkaParams.put("key.deserializer", StringDeserializer.class);
>> > kafkaParams.put("value.deserializer", BytesDeserializer.class);
>> > kafkaParams.put("auto.offset.reset", "earliest");
>> > //kafkaParams.put("max.poll.interval.ms", 12000);
>> > //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
>> > //kafkaParams.put("request.timeout.ms", 12000);
>> >
>> >
>> > JavaInputDStream<ConsumerRecordString, ListBytes>>> messages =
>> >   KafkaUtils.createDirectStream(ssc,
>> > LocationStrategies.PreferConsistent(),
>> >
>> > ConsumerStrategies.Subscribe(topics, kafkaParams));
>> > messages.foreachRDD(rdd -> {
>> > List<ConsumerRecordString, ListBytes>>> input =
>> > rdd.collect();
>> > System.out.println("count is"+input.size());
>> > });
>> > ssc.start();
>> > ssc.awaitTermination();
>> >
>> > Thanks
>> > Jagadish
>> >
>> >
>> >
>> > --
>> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming fails with unable to get records after polling for 512 ms

2017-11-15 Thread Cody Koeninger
spark.streaming.kafka.consumer.poll.ms  is a spark configuration, not
a kafka parameter.

see http://spark.apache.org/docs/latest/configuration.html

On Tue, Nov 14, 2017 at 8:56 PM, jkagitala  wrote:
> Hi,
>
> I'm trying to add spark-streaming to our kafka topic. But, I keep getting
> this error
> java.lang.AssertionError: assertion failed: Failed to get record after
> polling for 512 ms.
>
> I tried to add different params like max.poll.interval.ms,
> spark.streaming.kafka.consumer.poll.ms to 1ms in kafkaParams.
> But, i still get failed to get records after 512ms. Not sure, even adding
> the above params doesn't change the polling time.
>
> Without spark-streaming, i'm able to fetch the records. Only with
> spark-streaming addon, i get this error.
>
> Any help is greatly appreciated. Below, is the code i'm using.
>
> SparkConf sparkConf = new
> SparkConf().setAppName("JavaFlingerSparkApplication").setMaster("local[*]");
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> Durations.seconds(10));
>
> kafkaParams.put("bootstrap.servers", hosts);
> kafkaParams.put("group.id", groupid);
> kafkaParams.put("auto.commit.enable", false);
> kafkaParams.put("key.deserializer", StringDeserializer.class);
> kafkaParams.put("value.deserializer", BytesDeserializer.class);
> kafkaParams.put("auto.offset.reset", "earliest");
> //kafkaParams.put("max.poll.interval.ms", 12000);
> //kafkaParams.put("spark.streaming.kafka.consumer.poll.ms", 12000);
> //kafkaParams.put("request.timeout.ms", 12000);
>
>
> JavaInputDStream>> messages =
>   KafkaUtils.createDirectStream(ssc,
> LocationStrategies.PreferConsistent(),
>
> ConsumerStrategies.Subscribe(topics, kafkaParams));
> messages.foreachRDD(rdd -> {
> List>> input = 
> rdd.collect();
> System.out.println("count is"+input.size());
> });
> ssc.start();
> ssc.awaitTermination();
>
> Thanks
> Jagadish
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: FW: Kafka Direct Stream - dynamic topic subscription

2017-10-29 Thread Cody Koeninger
As it says in SPARK-10320 and in the docs at
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#consumerstrategies
, you can use SubscribePattern

On Sun, Oct 29, 2017 at 3:56 PM, Ramanan, Buvana (Nokia - US/Murray
Hill)  wrote:
> Hello Cody,
>
>
>
> As the stake holders of JIRA SPARK-10320 issue, can you please explain the
> purpose of dynamic topic subscription? Does it mean adapting the consumer to
> read from the new partitions that might get created after the SparkStreaming
> job begins? Is there a succinct writeup on the dynamic topic subscription
> feature that you can share?
>
>
>
> Also, is there  a way I can subscribe to topics whose name matches a regular
> expression (some Kafka consumers such as kafka-python python library support
> that)?
>
>
>
> I forward the email I sent to spark users group that contains a little more
> background on my question.
>
>
>
> Thank you,
>
> Regards,
>
> Buvana
>
>
>
> From: Ramanan, Buvana (Nokia - US/Murray Hill)
> [mailto:buvana.rama...@nokia-bell-labs.com]
> Sent: Friday, October 27, 2017 10:46 PM
> To: user@spark.apache.org
> Subject: Kafka Direct Stream - dynamic topic subscription
>
>
>
> Hello,
>
>
>
> Using Spark 2.2.0. Interested in seeing the action of dynamic topic
> subscription.
>
>
>
> Tried this example: streaming.DirectKafkaWordCount (which uses
> org.apache.spark.streaming.kafka010)
>
>
>
> I start with 8 Kafka partitions in my topic and found that Spark Streaming
> executes 8 tasks (one per partition), which is what is expected. While this
> example process was going on, I increased the Kafka partitions to 16 and
> started producing data to the new partitions as well.
>
>
>
> I expected that the Kafka consumer that Spark uses, would detect this change
> and spawn new tasks for the new partitions. But I find that it only reads
> from the old partitions and does not read from new partitions. When I do a
> restart, it reads from all 16 partitions.
>
>
>
> Is this expected?
>
>
>
> What is meant by dynamic topic subscription?
>
>
>
> Does it apply only to topics with a name that matches a regular expression
> and it does not apply to dynamically growing partitions?
>
>
>
> Thanks,
>
> Buvana
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to read from multiple kafka topics using structured streaming (spark 2.2.0)?

2017-09-19 Thread Cody Koeninger
You should be able to pass a comma separated string of topics to
subscribe.  subscribePattern isn't necessary



On Tue, Sep 19, 2017 at 2:54 PM, kant kodali  wrote:
> got it! Sorry.
>
> On Tue, Sep 19, 2017 at 12:52 PM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> Use subscribepattern
>>
>> You haven't googled well enough -->
>> https://jaceklaskowski.gitbooks.io/spark-structured-streaming/spark-sql-streaming-KafkaSource.html
>> :)
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://about.me/JacekLaskowski
>> Spark Structured Streaming (Apache Spark 2.2+)
>> https://bit.ly/spark-structured-streaming
>> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> On Tue, Sep 19, 2017 at 9:50 PM, kant kodali  wrote:
>>>
>>> HI All,
>>>
>>> I am wondering How to read from multiple kafka topics using structured
>>> streaming (code below)? I googled prior to asking this question and I see
>>> responses related to Dstreams but not structured streams. Is it possible to
>>> read multiple topics using the same spark structured stream?
>>>
>>> sparkSession.readStream()
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", "localhost:9092")
>>> .option("subscribe", "hello1")
>>> .option("startingOffsets", "earliest")
>>> .option("failOnDataLoss", "false")
>>> .load();
>>>
>>>
>>> Thanks!
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: ConcurrentModificationException using Kafka Direct Stream

2017-09-18 Thread Cody Koeninger
Have you searched in jira, e.g.

https://issues.apache.org/jira/browse/SPARK-19185

On Mon, Sep 18, 2017 at 1:56 AM, HARSH TAKKAR  wrote:
> Hi
>
> Changing spark version if my last resort, is there any other workaround for
> this problem.
>
>
> On Mon, Sep 18, 2017 at 11:43 AM pandees waran  wrote:
>>
>> All, May I know what exactly changed in 2.1.1 which solved this problem?
>>
>> Sent from my iPhone
>>
>> On Sep 17, 2017, at 11:08 PM, Anastasios Zouzias 
>> wrote:
>>
>> Hi,
>>
>> I had a similar issue using 2.1.0 but not with Kafka. Updating to 2.1.1
>> solved my issue. Can you try with 2.1.1 as well and report back?
>>
>> Best,
>> Anastasios
>>
>> Am 17.09.2017 16:48 schrieb "HARSH TAKKAR" :
>>
>>
>> Hi
>>
>> I am using spark 2.1.0 with scala  2.11.8, and while iterating over the
>> partitions of each rdd in a dStream formed using KafkaUtils, i am getting
>> the below exception, please suggest a fix.
>>
>> I have following config
>>
>> kafka :
>> enable.auto.commit:"true",
>> auto.commit.interval.ms:"1000",
>> session.timeout.ms:"3",
>>
>> Spark:
>>
>> spark.streaming.backpressure.enabled=true
>>
>> spark.streaming.kafka.maxRatePerPartition=200
>>
>>
>> Exception in task 0.2 in stage 3236.0 (TID 77795)
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>>
>> --
>> Kind Regards
>> Harsh
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Multiple Kafka topics processing in Spark 2.2

2017-09-11 Thread Cody Koeninger
If you want an "easy" but not particularly performant way to do it,
each org.apache.kafka.clients.consumer.ConsumerRecord
has a topic.

The topic is going to be the same for the entire partition as long as you
haven't shuffled, hence the examples on how to deal with it at a partition
level.

On Fri, Sep 8, 2017 at 8:29 PM, Dan Dong  wrote:

> Hi,Alonso.
>   Thanks! I've read about this but did not quite understand it. To pick
> out the topic name of a kafka message seems a simple task but the example
> code looks so complicated with redundent info. Why do we need offsetRanges
> here and do we have a easy way to achieve this?
>
> Cheers,
> Dan
>
>
> 2017-09-06 21:17 GMT+08:00 Alonso Isidoro Roman :
>
>> Hi, reading the official doc
>> ,
>> i think you can do it this way:
>>
>> import org.apache.spark.streaming.kafka._
>>
>>val directKafkaStream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](
>>
>>   ssc, kafkaParams, topicsSet)
>>
>>
>>  // Hold a reference to the current offset ranges, so it can be used 
>> downstream
>>  var offsetRanges = Array.empty[OffsetRange]
>>
>>  directKafkaStream.transform { rdd =>
>>offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>rdd
>>  }.map {
>>...
>>  }.foreachRDD { rdd =>
>>for (o <- offsetRanges) {
>>  println(*s"${o.topic}* ${o.partition} ${o.fromOffset} ${o.untilOffset}")
>>}
>>
>>  }
>>
>>
>> 2017-09-06 14:38 GMT+02:00 Dan Dong :
>>
>>> Hi, All,
>>>   I have one issue here about how to process multiple Kafka topics in a
>>> Spark 2.* program. My question is: How to get the topic name from a message
>>> received from Kafka? E.g:
>>>
>>> ..
>>> val messages = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](
>>>   ssc, kafkaParams, topicsSet)
>>>
>>> // Get the lines, split them into words, count the words and print
>>> val lines = messages.map(_._2)
>>> val words = lines.flatMap(_.split(" "))
>>> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>>> wordCounts.print()
>>> ..
>>>
>>> Kafka send the messages in multiple topics through console producer for
>>> example. But when Spark receive the message, how it will know which topic
>>> is this piece of message coming from? Thanks a lot for any of your helps!
>>>
>>> Cheers,
>>> Dan
>>>
>>
>>
>>
>> --
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> 
>>
>
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-29 Thread Cody Koeninger
I don't see anything obvious.  If the slowness is correlated with the
errors you're seeing, I'd start looking at what's going on with kafka or
your network.

On Mon, Aug 28, 2017 at 7:06 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> Hi Cody,
>
> Following is the way that I am consuming data for a 60 second batch. Do
> you see anything that is wrong with the way the data is getting consumed
> that can cause slowness in performance?
>
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> kafkaBrokers,
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> classOf[StringDeserializer],
>   "auto.offset.reset" -> "latest",
>   "heartbeat.interval.ms" -> Integer.valueOf(2),
>   "session.timeout.ms" -> Integer.valueOf(6),
>   "request.timeout.ms" -> Integer.valueOf(9),
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>   "group.id" -> "test1"
> )
>
>   val hubbleStream = KafkaUtils.createDirectStream[String, String](
> ssc,
> LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, String](topicsSet,
> kafkaParams)
>   )
>
> val kafkaStreamRdd = kafkaStream.transform { rdd =>
> rdd.map(consumerRecord => (consumerRecord.key(), consumerRecord.value()))
> }
>
> On Mon, Aug 28, 2017 at 11:56 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> There is no difference in performance even with Cache being enabled.
>>
>> On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> There is no difference in performance even with Cache being disabled.
>>>
>>> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> So if you can run with cache enabled for some time, does that
>>>> significantly affect the performance issue you were seeing?
>>>>
>>>> Those settings seem reasonable enough.   If preferred locations is
>>>> behaving correctly you shouldn't need cached consumers for all 96
>>>> partitions on any one executor, so that maxCapacity setting is
>>>> probably unnecessary.
>>>>
>>>> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
>>>> <swethakasire...@gmail.com> wrote:
>>>> > Because I saw some posts that say that consumer cache  enabled will
>>>> have
>>>> > concurrentModification exception with reduceByKeyAndWIndow. I see
>>>> those
>>>> > errors as well after running for sometime with cache being enabled.
>>>> So, I
>>>> > had to disable it. Please see the tickets below.  We have 96
>>>> partitions. So
>>>> > if I enable cache, would teh following settings help to improve
>>>> performance?
>>>> >
>>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>>> Integer.valueOf(96),
>>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>>> Integer.valueOf(96),
>>>> >
>>>> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>>>> >
>>>> >
>>>> > http://markmail.org/message/n4cdxwurlhf44q5x
>>>> >
>>>> > https://issues.apache.org/jira/browse/SPARK-19185
>>>> >
>>>> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>> >>
>>>> >> Why are you setting consumer.cache.enabled to false?
>>>> >>
>>>> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com>
>>>> wrote:
>>>> >> > Hi,
>>>> >> >
>>>> >> > What would be the appropriate settings to run Spark with Kafka 10?
>>>> My
>>>> >> > job
>>>> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But
>>>> its
>>>> >> > very
>>>> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for
>>>> Kafka 10
>>>> >> > . I
>>>> >> > see the following error som

Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-28 Thread Cody Koeninger
So if you can run with cache enabled for some time, does that
significantly affect the performance issue you were seeing?

Those settings seem reasonable enough.   If preferred locations is
behaving correctly you shouldn't need cached consumers for all 96
partitions on any one executor, so that maxCapacity setting is
probably unnecessary.

On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
<swethakasire...@gmail.com> wrote:
> Because I saw some posts that say that consumer cache  enabled will have
> concurrentModification exception with reduceByKeyAndWIndow. I see those
> errors as well after running for sometime with cache being enabled. So, I
> had to disable it. Please see the tickets below.  We have 96 partitions. So
> if I enable cache, would teh following settings help to improve performance?
>
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.valueOf(96),
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.valueOf(96),
>
> "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>
>
> http://markmail.org/message/n4cdxwurlhf44q5x
>
> https://issues.apache.org/jira/browse/SPARK-19185
>
> On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Why are you setting consumer.cache.enabled to false?
>>
>> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote:
>> > Hi,
>> >
>> > What would be the appropriate settings to run Spark with Kafka 10? My
>> > job
>> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>> > very
>> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10
>> > . I
>> > see the following error sometimes . Please see the kafka parameters and
>> > the
>> > consumer strategy for creating the stream below. Any suggestions on how
>> > to
>> > run this with better performance would be of great help.
>> >
>> > java.lang.AssertionError: assertion failed: Failed to get records for
>> > test
>> > stream1 72 324027964 after polling for 12
>> >
>> > val kafkaParams = Map[String, Object](
>> >   "bootstrap.servers" -> kafkaBrokers,
>> >   "key.deserializer" -> classOf[StringDeserializer],
>> >   "value.deserializer" -> classOf[StringDeserializer],
>> >   "auto.offset.reset" -> "latest",
>> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
>> >   "session.timeout.ms" -> Integer.valueOf(6),
>> >   "request.timeout.ms" -> Integer.valueOf(9),
>> >   "enable.auto.commit" -> (false: java.lang.Boolean),
>> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> >   "group.id" -> "test1"
>> > )
>> >
>> >   val hubbleStream = KafkaUtils.createDirectStream[String, String](
>> > ssc,
>> > LocationStrategies.PreferConsistent,
>> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>> > kafkaParams)
>> >   )
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-performance-while-running-Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Consumer Pre Fetch Messages + Async commits

2017-08-28 Thread Cody Koeninger
1. No, prefetched message offsets aren't exposed.

2. No, I'm not aware of any plans for sync commit, and I'm not sure
that makes sense.  You have to be able to deal with repeat messages in
the event of failure in any case, so the only difference sync commit
would make would be (possibly) slower run time.

On Sat, Aug 26, 2017 at 1:07 AM, Julia Wistance
 wrote:
> Hi Experts,
>
> A question on what could potentially happen with Spark Streaming 2.2.0 +
> Kafka. LocationStrategies says that "new Kafka consumer API will pre-fetch
> messages into buffers.".
> If we store offsets in Kafka, currently we can only use a async commits.
>
> So,
> 1 - Could it happen that we commit offsets that we havent processed yet but
> the kafka consumers has prefetched
> 2 - Are there plans to support a sync commit? Although we can go for an
> alternate store of commits like HBase / Zookeeper, MySQL etc the code would
> wait till the offsets are stored in either of these systems. It would make
> sense that Spark / Kafka also adds a sync commit option?
>
> Appreciate the reply.
> JW
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-25 Thread Cody Koeninger
Why are you setting consumer.cache.enabled to false?

On Fri, Aug 25, 2017 at 2:19 PM, SRK  wrote:
> Hi,
>
> What would be the appropriate settings to run Spark with Kafka 10? My job
> works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very
> slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10 . I
> see the following error sometimes . Please see the kafka parameters and the
> consumer strategy for creating the stream below. Any suggestions on how to
> run this with better performance would be of great help.
>
> java.lang.AssertionError: assertion failed: Failed to get records for test
> stream1 72 324027964 after polling for 12
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> kafkaBrokers,
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> classOf[StringDeserializer],
>   "auto.offset.reset" -> "latest",
>   "heartbeat.interval.ms" -> Integer.valueOf(2),
>   "session.timeout.ms" -> Integer.valueOf(6),
>   "request.timeout.ms" -> Integer.valueOf(9),
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>   "group.id" -> "test1"
> )
>
>   val hubbleStream = KafkaUtils.createDirectStream[String, String](
> ssc,
> LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
>   )
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Slower-performance-while-running-Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10?

2017-08-22 Thread Cody Koeninger
Kafka rdds need to start from a specified offset, you really don't
want the executors just starting at whatever offset happened to be
latest at the time they ran.

If you need a way to figure out the latest offset at the time the
driver starts up, you can always use a consumer to read the offsets
and then pass that to Assign (just make sure that consumer is closed
before the job starts so you don't get group id conflicts).  You can
even make your own implementation of ConsumerStrategy, which should
allow you to do pretty much whatever you need to get the consumer in
the state you want.

On Mon, Aug 21, 2017 at 6:57 PM, swetha kasireddy
<swethakasire...@gmail.com> wrote:
> Hi Cody,
>
> I think the Assign is used if we want it to start from a specified offset.
> What if we want it to start it from the latest offset with something like
> returned by "auto.offset.reset" -> "latest",.
>
>
> Thanks!
>
> On Mon, Aug 21, 2017 at 9:06 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Yes, you can start from specified offsets.  See ConsumerStrategy,
>> specifically Assign
>>
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#your-own-data-store
>>
>> On Tue, Aug 15, 2017 at 1:18 PM, SRK <swethakasire...@gmail.com> wrote:
>> > Hi,
>> >
>> > How to force Spark Kafka Direct to start from the latest offset when the
>> > lag
>> > is huge in kafka 10? It seems to be processing from the latest offset
>> > stored
>> > for a group id. One way to do this is to change the group id. But it
>> > would
>> > mean that each time that we need to process the job from the latest
>> > offset
>> > we have to provide a new group id.
>> >
>> > Is there a way to force the job to run from the latest offset in case we
>> > need to and still use the same group id?
>> >
>> > Thanks!
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-force-Spark-Kafka-Direct-to-start-from-the-latest-offset-when-the-lag-is-huge-in-kafka-10-tp29071.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to force Spark Kafka Direct to start from the latest offset when the lag is huge in kafka 10?

2017-08-21 Thread Cody Koeninger
Yes, you can start from specified offsets.  See ConsumerStrategy,
specifically Assign

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#your-own-data-store

On Tue, Aug 15, 2017 at 1:18 PM, SRK  wrote:
> Hi,
>
> How to force Spark Kafka Direct to start from the latest offset when the lag
> is huge in kafka 10? It seems to be processing from the latest offset stored
> for a group id. One way to do this is to change the group id. But it would
> mean that each time that we need to process the job from the latest offset
> we have to provide a new group id.
>
> Is there a way to force the job to run from the latest offset in case we
> need to and still use the same group id?
>
> Thanks!
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-force-Spark-Kafka-Direct-to-start-from-the-latest-offset-when-the-lag-is-huge-in-kafka-10-tp29071.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-09 Thread Cody Koeninger
org.apache.spark.streaming.kafka.KafkaCluster has methods
getLatestLeaderOffsets and getEarliestLeaderOffsets

On Mon, Aug 7, 2017 at 11:37 PM, shyla deshpande
 wrote:
> Thanks TD.
>
> On Mon, Aug 7, 2017 at 8:59 PM, Tathagata Das 
> wrote:
>>
>> I dont think there is any easier way.
>>
>> On Mon, Aug 7, 2017 at 7:32 PM, shyla deshpande 
>> wrote:
>>>
>>> Thanks TD for the response. I forgot to mention that I am not using
>>> structured streaming.
>>>
>>> I was looking into KafkaUtils.createRDD, and looks like I need to get the
>>> earliest and the latest offset for each partition to build the
>>> Array(offsetRange). I wanted to know if there was a easier way.
>>>
>>> 1 reason why we are hesitating to use structured streaming is because I
>>> need to persist the data in Cassandra database which I believe is not
>>> production ready.
>>>
>>>
>>> On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das
>>>  wrote:

 Its best to use DataFrames. You can read from as streaming or as batch.
 More details here.


 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries

 https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

 On Mon, Aug 7, 2017 at 6:03 PM, shyla deshpande
  wrote:
>
> Hi all,
>
> What is the easiest way to read all the data from kafka in a batch
> program for a given topic?
> I have 10 kafka partitions, but the data is not much. I would like to
> read  from the earliest from all the partitions for a topic.
>
> I appreciate any help. Thanks


>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread Cody Koeninger
Yes

On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande
<deshpandesh...@gmail.com> wrote:
> Thanks Cody again.
>
> No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in
> the Cassandra table and saveToCassandra  is an action and my data do get
> saved into Cassandra. It is working as expected 99% of the time except that
> when there is an exception, I did not want the offsets to be committed.
>
> By Filtering for unsuccessful attempts, do you mean filtering the bad
> records...
>
>
>
>
>
>
> On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> If literally all you are doing is rdd.map I wouldn't expect
>> saveToCassandra to happen at all, since map is not an action.
>>
>> Filtering for unsuccessful attempts and collecting those back to the
>> driver would be one way for the driver to know whether it was safe to
>> commit.
>>
>> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
>> <deshpandesh...@gmail.com> wrote:
>> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
>> > running on executor
>> >
>> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is
>> > running on driver.
>> >
>> > Is this the reason why kafka offsets are committed even when an
>> > exception is
>> > raised? If so is there a way to commit the offsets only when there are
>> > no
>> > exceptions?
>> >
>> >
>> >
>> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande
>> > <deshpandesh...@gmail.com>
>> > wrote:
>> >>
>> >> Thanks again Cody,
>> >>
>> >> My understanding is all the code inside foreachRDD is running on the
>> >> driver except for
>> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
>> >>
>> >> When the exception is raised, I was thinking I won't be committing the
>> >> offsets, but the offsets are committed all the time independent of
>> >> whether
>> >> an exception was raised or not.
>> >>
>> >> It will be helpful if you can explain this behavior.
>> >>
>> >>
>> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger <c...@koeninger.org>
>> >> wrote:
>> >>>
>> >>> I mean that the kafka consumers running on the executors should not be
>> >>> automatically committing, because the fact that a message was read by
>> >>> the consumer has no bearing on whether it was actually successfully
>> >>> processed after reading.
>> >>>
>> >>> It sounds to me like you're confused about where code is running.
>> >>> foreachRDD runs on the driver, not the executor.
>> >>>
>> >>>
>> >>>
>> >>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>> >>>
>> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>> >>> <deshpandesh...@gmail.com> wrote:
>> >>> > Thanks Cody for your response.
>> >>> >
>> >>> > All I want to do is, commit the offsets only if I am successfully
>> >>> > able
>> >>> > to
>> >>> > write to cassandra database.
>> >>> >
>> >>> > The line //save the rdd to Cassandra database is
>> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>> >>> >
>> >>> > What do you mean by Executors shouldn't be auto-committing, that's
>> >>> > why
>> >>> > it's
>> >>> > being overridden. It is the executors that do the mapping and saving
>> >>> > to
>> >>> > cassandra. The status of success or failure of this operation is
>> >>> > known
>> >>> > only
>> >>> > on the executor and thats where I want to commit the kafka offsets.
>> >>> > If
>> >>> > this
>> >>> > is not what I sould be doing, then  what is the right way?
>> >>> >
>> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger <c...@koeninger.org>
>> >>> > wrote:
>> >>> >>
>> >>> >> If your complaint is about offsets being committed that yo

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread Cody Koeninger
If literally all you are doing is rdd.map I wouldn't expect
saveToCassandra to happen at all, since map is not an action.

Filtering for unsuccessful attempts and collecting those back to the
driver would be one way for the driver to know whether it was safe to
commit.

On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
<deshpandesh...@gmail.com> wrote:
> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
> running on executor
>
> stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is
> running on driver.
>
> Is this the reason why kafka offsets are committed even when an exception is
> raised? If so is there a way to commit the offsets only when there are no
> exceptions?
>
>
>
> On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande <deshpandesh...@gmail.com>
> wrote:
>>
>> Thanks again Cody,
>>
>> My understanding is all the code inside foreachRDD is running on the
>> driver except for
>> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
>>
>> When the exception is raised, I was thinking I won't be committing the
>> offsets, but the offsets are committed all the time independent of whether
>> an exception was raised or not.
>>
>> It will be helpful if you can explain this behavior.
>>
>>
>> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>>
>>> I mean that the kafka consumers running on the executors should not be
>>> automatically committing, because the fact that a message was read by
>>> the consumer has no bearing on whether it was actually successfully
>>> processed after reading.
>>>
>>> It sounds to me like you're confused about where code is running.
>>> foreachRDD runs on the driver, not the executor.
>>>
>>>
>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>>>
>>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>>> <deshpandesh...@gmail.com> wrote:
>>> > Thanks Cody for your response.
>>> >
>>> > All I want to do is, commit the offsets only if I am successfully able
>>> > to
>>> > write to cassandra database.
>>> >
>>> > The line //save the rdd to Cassandra database is
>>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>>> >
>>> > What do you mean by Executors shouldn't be auto-committing, that's why
>>> > it's
>>> > being overridden. It is the executors that do the mapping and saving to
>>> > cassandra. The status of success or failure of this operation is known
>>> > only
>>> > on the executor and thats where I want to commit the kafka offsets. If
>>> > this
>>> > is not what I sould be doing, then  what is the right way?
>>> >
>>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger <c...@koeninger.org>
>>> > wrote:
>>> >>
>>> >> If your complaint is about offsets being committed that you didn't
>>> >> expect... auto commit being false on executors shouldn't have anything
>>> >> to do with that.  Executors shouldn't be auto-committing, that's why
>>> >> it's being overridden.
>>> >>
>>> >> What you've said and the code you posted isn't really enough to
>>> >> explain what your issue is, e.g.
>>> >>
>>> >> is this line
>>> >> // save the rdd to Cassandra database
>>> >> a blocking call
>>> >>
>>> >> are you sure that the rdd foreach isn't being retried and succeeding
>>> >> the second time around, etc
>>> >>
>>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>>> >> <deshpandesh...@gmail.com> wrote:
>>> >> > Hello All,
>>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>>> >> >
>>> >> > I am setting enable.auto.commit to false, and manually want to
>>> >> > commit
>>> >> > the
>>> >> > offsets after my output operation is successful. So when a exception
>>> >> > is
>>> >> > raised during during the processing I do not want the offsets to be
>>> >> > committed. But looks like the offsets are automatically committed
>>> >> > even
>>> >> > when
&

Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread Cody Koeninger
I mean that the kafka consumers running on the executors should not be
automatically committing, because the fact that a message was read by
the consumer has no bearing on whether it was actually successfully
processed after reading.

It sounds to me like you're confused about where code is running.
foreachRDD runs on the driver, not the executor.

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
<deshpandesh...@gmail.com> wrote:
> Thanks Cody for your response.
>
> All I want to do is, commit the offsets only if I am successfully able to
> write to cassandra database.
>
> The line //save the rdd to Cassandra database is
> rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>
> What do you mean by Executors shouldn't be auto-committing, that's why it's
> being overridden. It is the executors that do the mapping and saving to
> cassandra. The status of success or failure of this operation is known only
> on the executor and thats where I want to commit the kafka offsets. If this
> is not what I sould be doing, then  what is the right way?
>
> On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> If your complaint is about offsets being committed that you didn't
>> expect... auto commit being false on executors shouldn't have anything
>> to do with that.  Executors shouldn't be auto-committing, that's why
>> it's being overridden.
>>
>> What you've said and the code you posted isn't really enough to
>> explain what your issue is, e.g.
>>
>> is this line
>> // save the rdd to Cassandra database
>> a blocking call
>>
>> are you sure that the rdd foreach isn't being retried and succeeding
>> the second time around, etc
>>
>> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>> <deshpandesh...@gmail.com> wrote:
>> > Hello All,
>> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>> >
>> > I am setting enable.auto.commit to false, and manually want to commit
>> > the
>> > offsets after my output operation is successful. So when a exception is
>> > raised during during the processing I do not want the offsets to be
>> > committed. But looks like the offsets are automatically committed even
>> > when
>> > the exception is raised and thereby I am losing data.
>> > In my logs I see,  WARN  overriding enable.auto.commit to false for
>> > executor.  But I don't want it to override. Please help.
>> >
>> > My code looks like..
>> >
>> > val kafkaParams = Map[String, Object](
>> >   "bootstrap.servers" -> brokers,
>> >   "key.deserializer" -> classOf[StringDeserializer],
>> >   "value.deserializer" -> classOf[StringDeserializer],
>> >   "group.id" -> "Group1",
>> >   "auto.offset.reset" -> offsetresetparameter,
>> >   "enable.auto.commit" -> (false: java.lang.Boolean)
>> > )
>> >
>> > val myTopics = Array("topic1")
>> > val stream1 = KafkaUtils.createDirectStream[String, String](
>> >   ssc,
>> >   PreferConsistent,
>> >   Subscribe[String, String](myTopics, kafkaParams)
>> > )
>> >
>> > stream1.foreachRDD { (rdd, time) =>
>> > val offsetRanges =
>> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> > try {
>> > //save the rdd to Cassandra database
>> >
>> >
>> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
>> > } catch {
>> >   case ex: Exception => {
>> > println(ex.toString + "!! Bad Data, Unable to persist
>> > into
>> > table !" + errorOffsetRangesToString(offsetRanges))
>> >   }
>> > }
>> > }
>> >
>> > ssc.start()
>> > ssc.awaitTermination()
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-06 Thread Cody Koeninger
If your complaint is about offsets being committed that you didn't
expect... auto commit being false on executors shouldn't have anything
to do with that.  Executors shouldn't be auto-committing, that's why
it's being overridden.

What you've said and the code you posted isn't really enough to
explain what your issue is, e.g.

is this line
// save the rdd to Cassandra database
a blocking call

are you sure that the rdd foreach isn't being retried and succeeding
the second time around, etc

On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
 wrote:
> Hello All,
> I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>
> I am setting enable.auto.commit to false, and manually want to commit the
> offsets after my output operation is successful. So when a exception is
> raised during during the processing I do not want the offsets to be
> committed. But looks like the offsets are automatically committed even when
> the exception is raised and thereby I am losing data.
> In my logs I see,  WARN  overriding enable.auto.commit to false for
> executor.  But I don't want it to override. Please help.
>
> My code looks like..
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> brokers,
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> classOf[StringDeserializer],
>   "group.id" -> "Group1",
>   "auto.offset.reset" -> offsetresetparameter,
>   "enable.auto.commit" -> (false: java.lang.Boolean)
> )
>
> val myTopics = Array("topic1")
> val stream1 = KafkaUtils.createDirectStream[String, String](
>   ssc,
>   PreferConsistent,
>   Subscribe[String, String](myTopics, kafkaParams)
> )
>
> stream1.foreachRDD { (rdd, time) =>
> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> try {
> //save the rdd to Cassandra database
>
>   stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
> } catch {
>   case ex: Exception => {
> println(ex.toString + "!! Bad Data, Unable to persist into
> table !" + errorOffsetRangesToString(offsetRanges))
>   }
> }
> }
>
> ssc.start()
> ssc.awaitTermination()

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming giving me a bunch of WARNINGS, please help meunderstand them

2017-07-10 Thread Cody Koeninger
The warnings regarding configuration on the executor are for the
executor kafka consumer, not the driver kafka consumer.

In general, the executor kafka consumers should consume only exactly
the offsets the driver told them to, and not be involved in committing
offsets / part of the same group as the driver.  That's why those
configurations are forcibly overriden.

In short those warnings are normal, the main thing you should be
concerned about is the checkpoint.


On Mon, Jul 10, 2017 at 10:26 AM, shyla deshpande
 wrote:
> WARN  Use an existing SparkContext, some configuration may not take effect.
>  I wanted to restart the spark streaming app, so stopped the running
> and issued a new spark submit. Why and how it will use a existing
> SparkContext?
> => you are using checkpoint to restore the sparkcontext.
> => No, I am not using checkpoint for recovery. I need the checkpoint because
> I am doing stateful streaming.
>
> WARN  Spark is not running in local mode, therefore the checkpoint directory
> must not be on the local filesystem. Directory 'file:/efs/checkpoint'
> appears to be on the local filesystem.
> =>the CP path should be HDFSand so on. If you want to use local path, the
> cluster model should be local.
> => I am using the AWS EFS mount for checkpoint because I am running in
> standalone mode.
>
> WARN  overriding enable.auto.commit to false for executor
> =>stop the executor to commit the offset auto
> =>No, I don't want the autocommit. I do the commit later after my output
> operation using the commitAsync API
>
> WARN  overriding auto.offset.reset to none for executor
> =>it set the index where the executor read msg
> => I set this to none, because I want it to continue from where it left off
>
> WARN  overriding executor group.id to spark-executor-mygroupid
> => set the groupid of consumer. If you do not set, it will set a default and
> give a warning.
> => I am setting the groupid, mygroupid and it is adding spark-executor- as a
> prefix...
>
>
> On Mon, Jul 10, 2017 at 12:39 AM, 萝卜丝炒饭 <1427357...@qq.com> wrote:
>>
>> It seems you are usibg kafka 0.10.
>> See my comments below.
>>
>> ---Original---
>> From: "shyla deshpande"
>> Date: 2017/7/10 08:17:10
>> To: "user";
>> Subject: Spark streaming giving me a bunch of WARNINGS, please help
>> meunderstand them
>>
>> WARN  Use an existing SparkContext, some configuration may not take
>> effect.
>>  I wanted to restart the spark streaming app, so stopped the
>> running and issued a new spark submit. Why and how it will use a existing
>> SparkContext?
>> => you are using checkpoint to restore the sparkcontext.
>> WARN  Spark is not running in local mode, therefore the checkpoint
>> directory must not be on the local filesystem. Directory
>> 'file:/efs/checkpoint' appears to be on the local filesystem.
>> =>the CP path should be HDFSand so on.
>> If you want to use local path, the cluster model should be local.
>>
>>
>> WARN  overriding enable.auto.commit to false for executor
>> =>stop the executor to commit the offset auto.
>>
>> WARN  overriding auto.offset.reset to none for executor
>> =>it set the index where the executor read msg
>> WARN  overriding executor group.id to spark-executor-mygroupid
>> => set the groupid of consumer. If you do not set, it will set a default
>> and give a warning.
>>
>> WARN  overriding receive.buffer.bytes to 65536 see KAFKA-3135
>> WARN  overriding enable.auto.commit to false for executor
>> WARN  overriding auto.offset.reset to none for executor
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: The stability of Spark Stream Kafka 010

2017-06-29 Thread Cody Koeninger
Given the emphasis on structured streaming, I don't personally expect
a lot more work being put into DStreams-based projects, outside of
bugfixes.  Stable designation is kind of arbitrary at that point.

That 010 version wasn't developed until spark 2.0 timeframe, but you
can always try backporting it to spark 1.5.1





On Thu, Jun 29, 2017 at 11:07 AM, Martin Peng  wrote:
> Hi,
>
> We planned to upgrade our Spark Kafka library to 0.10 from 0.81 to simplify
> our infrastructure code logic. Does anybody know when will the 010 version
> become stable from experimental?
> May I use this 010 version together with Spark 1.5.1?
>
> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
> Thanks
> Martin

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark-Kafka integration - build failing with sbt

2017-06-19 Thread Cody Koeninger
org.apache.spark.streaming.kafka.KafkaUtils
is in the
spark-streaming-kafka-0-8
project

On Mon, Jun 19, 2017 at 1:01 PM, karan alang <karan.al...@gmail.com> wrote:
> Hi Cody - i do have a additional basic question ..
>
> When i tried to compile the code in Eclipse, i was not able to do that
>
> eg.
> import org.apache.spark.streaming.kafka.KafkaUtils
>
> gave errors saying KafaUtils was not part of the package.
> However, when i used sbt to compile - the compilation went through fine
>
> So, I assume additional libraries are being downloaded when i provide the
> appropriate packages in LibraryDependencies ?
> which ones would have helped compile this ?
>
>
>
> On Sat, Jun 17, 2017 at 2:53 PM, karan alang <karan.al...@gmail.com> wrote:
>>
>> Thanks, Cody .. yes, was able to fix that.
>>
>> On Sat, Jun 17, 2017 at 1:18 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>>
>>> There are different projects for different versions of kafka,
>>> spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10
>>>
>>> See
>>>
>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>>>
>>> On Fri, Jun 16, 2017 at 6:51 PM, karan alang <karan.al...@gmail.com>
>>> wrote:
>>> > I'm trying to compile kafka & Spark Streaming integration code i.e.
>>> > reading
>>> > from Kafka using Spark Streaming,
>>> >   and the sbt build is failing with error -
>>> >
>>> >   [error] (*:update) sbt.ResolveException: unresolved dependency:
>>> > org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
>>> >
>>> >   Scala version -> 2.10.7
>>> >   Spark Version -> 2.1.0
>>> >   Kafka version -> 0.9
>>> >   sbt version -> 0.13
>>> >
>>> > Contents of sbt files is as shown below ->
>>> >
>>> > 1)
>>> >   vi spark_kafka_code/project/plugins.sbt
>>> >
>>> >   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
>>> >
>>> >  2)
>>> >   vi spark_kafka_code/sparkkafka.sbt
>>> >
>>> > import AssemblyKeys._
>>> > assemblySettings
>>> >
>>> > name := "SparkKafka Project"
>>> >
>>> > version := "1.0"
>>> > scalaVersion := "2.11.7"
>>> >
>>> > val sparkVers = "2.1.0"
>>> >
>>> > // Base Spark-provided dependencies
>>> > libraryDependencies ++= Seq(
>>> >   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
>>> >   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
>>> >   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
>>> >
>>> > mergeStrategy in assembly := {
>>> >   case m if m.toLowerCase.endsWith("manifest.mf") =>
>>> > MergeStrategy.discard
>>> >   case m if m.toLowerCase.startsWith("META-INF")  =>
>>> > MergeStrategy.discard
>>> >   case "reference.conf"   =>
>>> > MergeStrategy.concat
>>> >   case m if m.endsWith("UnusedStubClass.class")   =>
>>> > MergeStrategy.discard
>>> >   case _ => MergeStrategy.first
>>> > }
>>> >
>>> >   i launch sbt, and then try to create an eclipse project, complete
>>> > error is
>>> > as shown below -
>>> >
>>> >   -
>>> >
>>> >   sbt
>>> > [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
>>> > [info] Loading project definition from
>>> >
>>> > /Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/project
>>> > [info] Set current project to SparkKafka Project (in build
>>> >
>>> > file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/)
>>> >> eclipse
>>> > [info] About to create Eclipse project files for your project(s).
>>> > [info] Updating
>>> >
>>> > {file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
>>> > [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
>>> > [warn] module not found:
>>

Re: Spark-Kafka integration - build failing with sbt

2017-06-17 Thread Cody Koeninger
There are different projects for different versions of kafka,
spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10

See

http://spark.apache.org/docs/latest/streaming-kafka-integration.html

On Fri, Jun 16, 2017 at 6:51 PM, karan alang  wrote:
> I'm trying to compile kafka & Spark Streaming integration code i.e. reading
> from Kafka using Spark Streaming,
>   and the sbt build is failing with error -
>
>   [error] (*:update) sbt.ResolveException: unresolved dependency:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
>
>   Scala version -> 2.10.7
>   Spark Version -> 2.1.0
>   Kafka version -> 0.9
>   sbt version -> 0.13
>
> Contents of sbt files is as shown below ->
>
> 1)
>   vi spark_kafka_code/project/plugins.sbt
>
>   addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.11.2")
>
>  2)
>   vi spark_kafka_code/sparkkafka.sbt
>
> import AssemblyKeys._
> assemblySettings
>
> name := "SparkKafka Project"
>
> version := "1.0"
> scalaVersion := "2.11.7"
>
> val sparkVers = "2.1.0"
>
> // Base Spark-provided dependencies
> libraryDependencies ++= Seq(
>   "org.apache.spark" %% "spark-core" % sparkVers % "provided",
>   "org.apache.spark" %% "spark-streaming" % sparkVers % "provided",
>   "org.apache.spark" %% "spark-streaming-kafka" % sparkVers)
>
> mergeStrategy in assembly := {
>   case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
>   case m if m.toLowerCase.startsWith("META-INF")  => MergeStrategy.discard
>   case "reference.conf"   => MergeStrategy.concat
>   case m if m.endsWith("UnusedStubClass.class")   => MergeStrategy.discard
>   case _ => MergeStrategy.first
> }
>
>   i launch sbt, and then try to create an eclipse project, complete error is
> as shown below -
>
>   -
>
>   sbt
> [info] Loading global plugins from /Users/karanalang/.sbt/0.13/plugins
> [info] Loading project definition from
> /Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/project
> [info] Set current project to SparkKafka Project (in build
> file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/)
>> eclipse
> [info] About to create Eclipse project files for your project(s).
> [info] Updating
> {file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
> [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
> [warn] module not found:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0
> [warn]  local: tried
> [warn]
> /Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-launcher-local: tried
> [warn]
> /Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-local: tried
> [warn]
> /Users/karanalang/Documents/Technology/SCALA/activator-dist-1.3.10/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  public: tried
> [warn]
> https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
> [warn]  typesafe-releases: tried
> [warn]
> http://repo.typesafe.com/typesafe/releases/org/apache/spark/spark-streaming-kafka_2.11/2.1.0/spark-streaming-kafka_2.11-2.1.0.pom
> [warn]  typesafe-ivy-releasez: tried
> [warn]
> http://repo.typesafe.com/typesafe/ivy-releases/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [info] Resolving jline#jline;2.12.1 ...
> [warn] ::
> [warn] ::  UNRESOLVED DEPENDENCIES ::
> [warn] ::
> [warn] :: org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
> [warn] ::
> [warn]
> [warn] Note: Unresolved dependencies path:
> [warn] org.apache.spark:spark-streaming-kafka_2.11:2.1.0
> (/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/sparkkafka.sbt#L12-16)
> [warn]   +- sparkkafka-project:sparkkafka-project_2.11:1.0
> [trace] Stack trace suppressed: run last *:update for the full output.
> [error] (*:update) sbt.ResolveException: unresolved dependency:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0: not found
> [info] Updating
> {file:/Users/karanalang/Documents/Technology/Coursera_spark_scala/spark_kafka_code/}spark_kafka_code...
> [info] Resolving org.apache.spark#spark-streaming-kafka_2.11;2.1.0 ...
> [warn] module not found:
> org.apache.spark#spark-streaming-kafka_2.11;2.1.0
> [warn]  local: tried
> [warn]
> /Users/karanalang/.ivy2/local/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-launcher-local: tried
> [warn]
> /Users/karanalang/.activator/repository/org.apache.spark/spark-streaming-kafka_2.11/2.1.0/ivys/ivy.xml
> [warn]  activator-local: tried
> [warn]
> 

Re: Message getting lost in Kafka + Spark Streaming

2017-05-30 Thread Cody Koeninger
First thing I noticed, you should be using a singleton kafka producer,
not recreating one every partition.  It's threadsafe.

On Tue, May 30, 2017 at 7:59 AM, Vikash Pareek
 wrote:
> I am facing an issue related to spark streaming with kafka, my use case is as
> follow:
> 1. Spark streaming(DirectStream) application reading data/messages from
> kafka topic and process it
> 2. On the basis of proccessed message, app will write proccessed message to
> different kafka topics
> for e.g. if messgese is harmonized then write to harmonized topic else
> unharmonized topic
>
> the problem is that during the streaming somehow we are lossing some
> messaged i.e all the incoming messages are not written to harmonized or
> unharmonized topics.
> for e.g. if app received 30 messages in one batch then sometime it write all
> the messges to output topics(this is expected behaviour) but sometimes it
> writes only 27 (3 messages are lost, this number can change).
>
> Versions as follow:
> Spark 1.6.0
> Kafka 0.9
>
> Kafka topics confguration is as follow:
> # of brokers: 3
> # replicxation factor: 3
> # of paritions: 3
>
> Following are the properties we are using for kafka:
> *  val props = new Properties()
>   props.put("metadata.broker.list",
> properties.getProperty("metadataBrokerList"))
>   props.put("auto.offset.reset",
> properties.getProperty("autoOffsetReset"))
>   props.put("group.id", properties.getProperty("group.id"))
>   props.put("serializer.class", "kafka.serializer.StringEncoder")
>   props.put("outTopicHarmonized",
> properties.getProperty("outletKafkaTopicHarmonized"))
>   props.put("outTopicUnharmonized",
> properties.getProperty("outletKafkaTopicUnharmonized"))
>   props.put("acks", "all");
>   props.put("retries", "5");
>   props.put("request.required.acks", "-1")
> *
> Following is the piece of code where we are writing proccessed messges to
> kafka:
> *  val schemaRdd2 = finalHarmonizedDF.toJSON
>
>   schemaRdd2.foreachPartition { partition =>
> val producerConfig = new ProducerConfig(props)
> val producer = new Producer[String, String](producerConfig)
>
> partition.foreach { row =>
>   if (debug) println(row.mkString)
>   val keyedMessage = new KeyedMessage[String,
> String](props.getProperty("outTopicHarmonized"),
> null, row.toString())
>   producer.send(keyedMessage)
>
> }
> //hack, should be done with the flush
> Thread.sleep(1000)
> producer.close()
>   }
> *
> We explicitely added sleep(1000) for testing purpose.
> But this is also not solving the problem :(
>
> Any suggestion would be appreciated.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: do we need to enable writeAheadLogs for DirectStream as well or is it only for indirect stream?

2017-05-02 Thread Cody Koeninger
You don't need write ahead logs for direct stream.

On Tue, May 2, 2017 at 11:32 AM, kant kodali  wrote:
> Hi All,
>
> I need some fault tolerance for my stateful computations and I am wondering
> why we need to enable writeAheadLogs for DirectStream like Kafka (for
> Indirect stream it makes sense). In case of driver failure DirectStream such
> as Kafka can pull the messages again from the last committed offset right?
>
> Thanks!

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Exactly-once semantics with kakfa CanCommitOffsets.commitAsync?

2017-04-28 Thread Cody Koeninger
It's asynchronous.  If your job stopped before the commit happened,
then of course it's not guaranteed to succeed.  But even if those
commits were somehow guaranteed to succeed even if your job stopped...
you still need idempotent output operations.  The point of
transactionality isn't that it's synchronous, it's that offsets and
output are stored in the same place, and guaranteed to either both
succeed or both fail.  Kafka can't give you that guarantee.  Some data
stores can.

With regard to doc clarity, that section starts by saying
"
Kafka delivery semantics in the case of failure depend on how and when
offsets are stored. Spark output operations are at-least-once. So if
you want the equivalent of exactly-once semantics, you must either
store offsets after an idempotent output, or store offsets in an
atomic transaction alongside output.
"

If you think that's still not clear, open a PR to contribute to the docs.



On Fri, Apr 28, 2017 at 10:47 AM, David Rosenstrauch <daro...@gmail.com> wrote:
> Yes, I saw that sentence too.  But it's rather short and not very
> explanatory, and there doesn't seem to be any further info available
> anywhere that expands on it.
>
> When I parse out that sentence:
>
> 1) "Kafka is not transactional" - i.e., the commits are done asynchronously,
> not synchronously.
> 2) "so your outputs must still be idempotent" - some of your commits may
> duplicate/overlap, so you need to be able to handle processing the same
> event(s) more than once.
>
> That doesn't quite make sense to me though.  I don't quite understand why #1
> implies #2.  Yes, Kafka isn't transactional - i.e., doesn't process my
> commits synchronously.  But it should be processing my commits *eventually*.
> If you look at my output from the previous message, even though I called
> commitAsync on 250959 -> 250962 in the first job, Kafka never actually
> processed those commits.  That's not an eventual/asynchronous commit; that's
> an optional commit.
>
> Is that in fact the semantics here - i.e., calls to commitAsync are not
> actually guaranteed to succeed?  If that's the case, the docs could really
> be a *lot* clearer about that.
>
> Thanks,
>
> DR
>
> On Fri, Apr 28, 2017 at 11:34 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> From that doc:
>>
>> " However, Kafka is not transactional, so your outputs must still be
>> idempotent. "
>>
>>
>>
>> On Fri, Apr 28, 2017 at 10:29 AM, David Rosenstrauch <daro...@gmail.com>
>> wrote:
>> > I'm doing a POC to test recovery with spark streaming from Kafka.  I'm
>> > using
>> > the technique for storing the offsets in Kafka, as described at:
>> >
>> >
>> > https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself
>> >
>> > I.e., grabbing the list of offsets before I start processing a batch of
>> > RDD's, and then committing them when I'm done.  The process pretty much
>> > works:  when I shut down my streaming process and then start it up
>> > again, it
>> > pretty much picks up where it left off.
>> >
>> > However, it looks like there's some overlap happening, where a few of
>> > the
>> > messages are being processed by both the old and the new streaming job
>> > runs.
>> > I.e., see the following log messages:
>> >
>> > End of old job run:
>> > 17/04/27 20:04:40 INFO KafkaRecoveryTester$: Committing rdd offsets:
>> > OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
>> > 250962]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
>> > 18]);OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
>> > 18]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 -> 17])
>> >
>> > Start of new job run:
>> > 17/04/27 20:56:50 INFO KafkaRecoveryTester$: Processing rdd with
>> > offsets:
>> > OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
>> > 100]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
>> > 100]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 ->
>> > 100]);OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
>> > 251044])
>> >
>> >
>> > Notice that in partition 0, for example, the 3 messages with offsets
>> > 250959
>> > through 250961 are being processed twice - once by the old job, and once
>> > by
>> > the new.  I would have expected that in the new run, the offset range
>> > for
>> > partition 0 would have been 250962 -> 251044, which would result in
>> > exactly-once semantics.
>> >
>> > Am I misunderstanding how this should work?  (I.e., exactly-once
>> > semantics
>> > is not possible here?)
>> >
>> > Thanks,
>> >
>> > DR
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Exactly-once semantics with kakfa CanCommitOffsets.commitAsync?

2017-04-28 Thread Cody Koeninger
>From that doc:

" However, Kafka is not transactional, so your outputs must still be
idempotent. "



On Fri, Apr 28, 2017 at 10:29 AM, David Rosenstrauch  wrote:
> I'm doing a POC to test recovery with spark streaming from Kafka.  I'm using
> the technique for storing the offsets in Kafka, as described at:
>
> https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself
>
> I.e., grabbing the list of offsets before I start processing a batch of
> RDD's, and then committing them when I'm done.  The process pretty much
> works:  when I shut down my streaming process and then start it up again, it
> pretty much picks up where it left off.
>
> However, it looks like there's some overlap happening, where a few of the
> messages are being processed by both the old and the new streaming job runs.
> I.e., see the following log messages:
>
> End of old job run:
> 17/04/27 20:04:40 INFO KafkaRecoveryTester$: Committing rdd offsets:
> OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
> 250962]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
> 18]);OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
> 18]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 -> 17])
>
> Start of new job run:
> 17/04/27 20:56:50 INFO KafkaRecoveryTester$: Processing rdd with offsets:
> OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
> 100]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
> 100]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 ->
> 100]);OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
> 251044])
>
>
> Notice that in partition 0, for example, the 3 messages with offsets 250959
> through 250961 are being processed twice - once by the old job, and once by
> the new.  I would have expected that in the new run, the offset range for
> partition 0 would have been 250962 -> 251044, which would result in
> exactly-once semantics.
>
> Am I misunderstanding how this should work?  (I.e., exactly-once semantics
> is not possible here?)
>
> Thanks,
>
> DR

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-27 Thread Cody Koeninger
If you're looking for some kind of instrumentation finer than at batch
boundaries, you'd have to do something with the individual messages
yourself.  You have full access to the individual messages including
offset.

On Thu, Apr 27, 2017 at 1:27 PM, Dominik Safaric
<dominiksafa...@gmail.com> wrote:
> Of course I am not asking to commit for every message. But instead of, 
> seeking to commit the last consumed offset at a given interval. For example, 
> from the 1st until the 5th second, messages until offset 100.000 of the 
> partition 10 were consumed, then from the 6th until the 10th second of 
> executing the last consumed offset of the same partition was 200.000 - and so 
> forth. This is the information I seek to get.
>
>> On 27 Apr 2017, at 20:11, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Are you asking for commits for every message?  Because that will kill
>> performance.
>>
>> On Thu, Apr 27, 2017 at 11:33 AM, Dominik Safaric
>> <dominiksafa...@gmail.com> wrote:
>>> Indeed I have. But, even when storing the offsets in Spark and committing 
>>> offsets upon completion of an output operation within the foreachRDD call 
>>> (as pointed in the example), the only offset that Spark’s Kafka 
>>> implementation commits to Kafka is the offset of the last message. For 
>>> example, if I have 100 million messages, then Spark will commit only the 
>>> 100 millionth offset, and the offsets of the intermediate batches - and 
>>> hence the questions.
>>>
>>>> On 26 Apr 2017, at 21:42, Cody Koeninger <c...@koeninger.org> wrote:
>>>>
>>>> have you read
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself
>>>>
>>>> On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
>>>> <dominiksafa...@gmail.com> wrote:
>>>>> The reason why I want to obtain this information, i.e. <partition, 
>>>>> offset, timestamp> tuples is to relate the consumption with the 
>>>>> production rates using the __consumer_offsets Kafka internal topic. 
>>>>> Interestedly, the Spark’s KafkaConsumer implementation does not auto 
>>>>> commit the offsets upon offset commit expiration, because as seen in the 
>>>>> logs, Spark overrides the enable.auto.commit property to false.
>>>>>
>>>>> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
>>>>> mind that I do not care about exactly-once, hence having messages 
>>>>> replayed is perfectly fine.
>>>>>
>>>>>> On 26 Apr 2017, at 19:26, Cody Koeninger <c...@koeninger.org> wrote:
>>>>>>
>>>>>> What is it you're actually trying to accomplish?
>>>>>>
>>>>>> You can get topic, partition, and offset bounds from an offset range like
>>>>>>
>>>>>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
>>>>>>
>>>>>> Timestamp isn't really a meaningful idea for a range of offsets.
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>>>>>> <dominiksafa...@gmail.com> wrote:
>>>>>>> Hi all,
>>>>>>>
>>>>>>> Because the Spark Streaming direct Kafka consumer maps offsets for a 
>>>>>>> given
>>>>>>> Kafka topic and a partition internally while having enable.auto.commit 
>>>>>>> set
>>>>>>> to false, how can I retrieve the offset of each made consumer’s poll 
>>>>>>> call
>>>>>>> using the offset ranges of an RDD? More precisely, the information I 
>>>>>>> seek to
>>>>>>> get after each poll call is the following: <timestamp, offset, 
>>>>>>> partition>.
>>>>>>>
>>>>>>> Thanks in advance,
>>>>>>> Dominik
>>>>>>>
>>>>>
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-27 Thread Cody Koeninger
Are you asking for commits for every message?  Because that will kill
performance.

On Thu, Apr 27, 2017 at 11:33 AM, Dominik Safaric
<dominiksafa...@gmail.com> wrote:
> Indeed I have. But, even when storing the offsets in Spark and committing 
> offsets upon completion of an output operation within the foreachRDD call (as 
> pointed in the example), the only offset that Spark’s Kafka implementation 
> commits to Kafka is the offset of the last message. For example, if I have 
> 100 million messages, then Spark will commit only the 100 millionth offset, 
> and the offsets of the intermediate batches - and hence the questions.
>
>> On 26 Apr 2017, at 21:42, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> have you read
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself
>>
>> On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
>> <dominiksafa...@gmail.com> wrote:
>>> The reason why I want to obtain this information, i.e. <partition, offset, 
>>> timestamp> tuples is to relate the consumption with the production rates 
>>> using the __consumer_offsets Kafka internal topic. Interestedly, the 
>>> Spark’s KafkaConsumer implementation does not auto commit the offsets upon 
>>> offset commit expiration, because as seen in the logs, Spark overrides the 
>>> enable.auto.commit property to false.
>>>
>>> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
>>> mind that I do not care about exactly-once, hence having messages replayed 
>>> is perfectly fine.
>>>
>>>> On 26 Apr 2017, at 19:26, Cody Koeninger <c...@koeninger.org> wrote:
>>>>
>>>> What is it you're actually trying to accomplish?
>>>>
>>>> You can get topic, partition, and offset bounds from an offset range like
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
>>>>
>>>> Timestamp isn't really a meaningful idea for a range of offsets.
>>>>
>>>>
>>>> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>>>> <dominiksafa...@gmail.com> wrote:
>>>>> Hi all,
>>>>>
>>>>> Because the Spark Streaming direct Kafka consumer maps offsets for a given
>>>>> Kafka topic and a partition internally while having enable.auto.commit set
>>>>> to false, how can I retrieve the offset of each made consumer’s poll call
>>>>> using the offset ranges of an RDD? More precisely, the information I seek 
>>>>> to
>>>>> get after each poll call is the following: <timestamp, offset, partition>.
>>>>>
>>>>> Thanks in advance,
>>>>> Dominik
>>>>>
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: help/suggestions to setup spark cluster

2017-04-27 Thread Cody Koeninger
You can just cap the cores used per job.

http://spark.apache.org/docs/latest/spark-standalone.html

http://spark.apache.org/docs/latest/spark-standalone.html#resource-scheduling

On Thu, Apr 27, 2017 at 1:07 AM, vincent gromakowski
<vincent.gromakow...@gmail.com> wrote:
> Spark standalone is not multi tenant you need one clusters per job. Maybe
> you can try fair scheduling and use one cluster but i doubt it will be prod
> ready...
>
> Le 27 avr. 2017 5:28 AM, "anna stax" <annasta...@gmail.com> a écrit :
>>
>> Thanks Cody,
>>
>> As I already mentioned I am running spark streaming on EC2 cluster in
>> standalone mode. Now in addition to streaming, I want to be able to run
>> spark batch job hourly and adhoc queries using Zeppelin.
>>
>> Can you please confirm that a standalone cluster is OK for this. Please
>> provide me some links to help me get started.
>>
>> Thanks
>> -Anna
>>
>> On Wed, Apr 26, 2017 at 7:46 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>>
>>> The standalone cluster manager is fine for production.  Don't use Yarn
>>> or Mesos unless you already have another need for it.
>>>
>>> On Wed, Apr 26, 2017 at 4:53 PM, anna stax <annasta...@gmail.com> wrote:
>>> > Hi Sam,
>>> >
>>> > Thank you for the reply.
>>> >
>>> > What do you mean by
>>> > I doubt people run spark in a. Single EC2 instance, certainly not in
>>> > production I don't think
>>> >
>>> > What is wrong in having a data pipeline on EC2 that reads data from
>>> > kafka,
>>> > processes using spark and outputs to cassandra? Please explain.
>>> >
>>> > Thanks
>>> > -Anna
>>> >
>>> > On Wed, Apr 26, 2017 at 2:22 PM, Sam Elamin <hussam.ela...@gmail.com>
>>> > wrote:
>>> >>
>>> >> Hi Anna
>>> >>
>>> >> There are a variety of options for launching spark clusters. I doubt
>>> >> people run spark in a. Single EC2 instance, certainly not in
>>> >> production I
>>> >> don't think
>>> >>
>>> >> I don't have enough information of what you are trying to do but if
>>> >> you
>>> >> are just trying to set things up from scratch then I think you can
>>> >> just use
>>> >> EMR which will create a cluster for you and attach a zeppelin instance
>>> >> as
>>> >> well
>>> >>
>>> >>
>>> >> You can also use databricks for ease of use and very little management
>>> >> but
>>> >> you will pay a premium for that abstraction
>>> >>
>>> >>
>>> >> Regards
>>> >> Sam
>>> >> On Wed, 26 Apr 2017 at 22:02, anna stax <annasta...@gmail.com> wrote:
>>> >>>
>>> >>> I need to setup a spark cluster for Spark streaming and scheduled
>>> >>> batch
>>> >>> jobs and adhoc queries.
>>> >>> Please give me some suggestions. Can this be done in standalone mode.
>>> >>>
>>> >>> Right now we have a spark cluster in standalone mode on AWS EC2
>>> >>> running
>>> >>> spark streaming application. Can we run spark batch jobs and zeppelin
>>> >>> on the
>>> >>> same. Do we need a better resource manager like Mesos?
>>> >>>
>>> >>> Are there any companies or individuals that can help in setting this
>>> >>> up?
>>> >>>
>>> >>> Thank you.
>>> >>>
>>> >>> -Anna
>>> >
>>> >
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: help/suggestions to setup spark cluster

2017-04-26 Thread Cody Koeninger
The standalone cluster manager is fine for production.  Don't use Yarn
or Mesos unless you already have another need for it.

On Wed, Apr 26, 2017 at 4:53 PM, anna stax  wrote:
> Hi Sam,
>
> Thank you for the reply.
>
> What do you mean by
> I doubt people run spark in a. Single EC2 instance, certainly not in
> production I don't think
>
> What is wrong in having a data pipeline on EC2 that reads data from kafka,
> processes using spark and outputs to cassandra? Please explain.
>
> Thanks
> -Anna
>
> On Wed, Apr 26, 2017 at 2:22 PM, Sam Elamin  wrote:
>>
>> Hi Anna
>>
>> There are a variety of options for launching spark clusters. I doubt
>> people run spark in a. Single EC2 instance, certainly not in production I
>> don't think
>>
>> I don't have enough information of what you are trying to do but if you
>> are just trying to set things up from scratch then I think you can just use
>> EMR which will create a cluster for you and attach a zeppelin instance as
>> well
>>
>>
>> You can also use databricks for ease of use and very little management but
>> you will pay a premium for that abstraction
>>
>>
>> Regards
>> Sam
>> On Wed, 26 Apr 2017 at 22:02, anna stax  wrote:
>>>
>>> I need to setup a spark cluster for Spark streaming and scheduled batch
>>> jobs and adhoc queries.
>>> Please give me some suggestions. Can this be done in standalone mode.
>>>
>>> Right now we have a spark cluster in standalone mode on AWS EC2 running
>>> spark streaming application. Can we run spark batch jobs and zeppelin on the
>>> same. Do we need a better resource manager like Mesos?
>>>
>>> Are there any companies or individuals that can help in setting this up?
>>>
>>> Thank you.
>>>
>>> -Anna
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-26 Thread Cody Koeninger
have you read

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself

On Wed, Apr 26, 2017 at 1:17 PM, Dominik Safaric
<dominiksafa...@gmail.com> wrote:
> The reason why I want to obtain this information, i.e. <partition, offset, 
> timestamp> tuples is to relate the consumption with the production rates 
> using the __consumer_offsets Kafka internal topic. Interestedly, the Spark’s 
> KafkaConsumer implementation does not auto commit the offsets upon offset 
> commit expiration, because as seen in the logs, Spark overrides the 
> enable.auto.commit property to false.
>
> Any idea onto how to use the KafkaConsumer’s auto offset commits? Keep in 
> mind that I do not care about exactly-once, hence having messages replayed is 
> perfectly fine.
>
>> On 26 Apr 2017, at 19:26, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> What is it you're actually trying to accomplish?
>>
>> You can get topic, partition, and offset bounds from an offset range like
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets
>>
>> Timestamp isn't really a meaningful idea for a range of offsets.
>>
>>
>> On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
>> <dominiksafa...@gmail.com> wrote:
>>> Hi all,
>>>
>>> Because the Spark Streaming direct Kafka consumer maps offsets for a given
>>> Kafka topic and a partition internally while having enable.auto.commit set
>>> to false, how can I retrieve the offset of each made consumer’s poll call
>>> using the offset ranges of an RDD? More precisely, the information I seek to
>>> get after each poll call is the following: <timestamp, offset, partition>.
>>>
>>> Thanks in advance,
>>> Dominik
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming 2.1 Kafka consumer - retrieving offset commits for each poll

2017-04-26 Thread Cody Koeninger
What is it you're actually trying to accomplish?

You can get topic, partition, and offset bounds from an offset range like

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#obtaining-offsets

Timestamp isn't really a meaningful idea for a range of offsets.


On Tue, Apr 25, 2017 at 2:43 PM, Dominik Safaric
 wrote:
> Hi all,
>
> Because the Spark Streaming direct Kafka consumer maps offsets for a given
> Kafka topic and a partition internally while having enable.auto.commit set
> to false, how can I retrieve the offset of each made consumer’s poll call
> using the offset ranges of an RDD? More precisely, the information I seek to
> get after each poll call is the following: .
>
> Thanks in advance,
> Dominik
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming to kafka exactly once

2017-03-22 Thread Cody Koeninger
If you're talking about reading the same message multiple times in a
failure situation, see

https://github.com/koeninger/kafka-exactly-once

If you're talking about producing the same message multiple times in a
failure situation, keep an eye on

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

If you're talking about producers just misbehaving and sending
different copies of what is essentially the same message from a domain
perspective, you have to dedupe that with your own logic.

On Wed, Mar 22, 2017 at 2:52 PM, Matt Deaver  wrote:
> You have to handle de-duplication upstream or downstream. It might
> technically be possible to handle this in Spark but you'll probably have a
> better time handling duplicates in the service that reads from Kafka.
>
> On Wed, Mar 22, 2017 at 1:49 PM, Maurin Lenglart 
> wrote:
>>
>> Hi,
>> we are trying to build a spark streaming solution that subscribe and push
>> to kafka.
>>
>> But we are running into the problem of duplicates events.
>>
>> Right now, I am doing a “forEachRdd” and loop over the message of each
>> partition and send those message to kafka.
>>
>>
>>
>> Is there any good way of solving that issue?
>>
>>
>>
>> thanks
>
>
>
>
> --
> Regards,
>
> Matt
> Data Engineer
> https://www.linkedin.com/in/mdeaver
> http://mattdeav.pythonanywhere.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming+Kafka][How-to]

2017-03-22 Thread Cody Koeninger
Glad you got it worked out.  That's cool as long as your use case doesn't
actually require e.g. partition 0 to always be scheduled to the same
executor across different batches.

On Tue, Mar 21, 2017 at 7:35 PM, OUASSAIDI, Sami 
wrote:

> So it worked quite well with a coalesce, I was able to find an solution to
> my problem : Altough not directly handling the executor a good roundaway
> was to assign the desired partition to a specific stream through assign
> strategy and coalesce to a single partition then repeat the same process
> for the remaining topics on different streams and at the end do a an union
> of these streams.
>
> PS : No shuffle was made during the whole thing since the rdd partitions
> were collapsed to a single one
>
> Le 17 mars 2017 8:04 PM, "Michael Armbrust"  a
> écrit :
>
>> Another option that would avoid a shuffle would be to use assign and
>> coalesce, running two separate streams.
>>
>> spark.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "...")
>>   .option("assign", """{t0: {"0": }, t1:{"0": x}}""")
>>   .load()
>>   .coalesce(1)
>>   .writeStream
>>   .foreach(... code to write to cassandra ...)
>>
>> spark.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "...")
>>   .option("assign", """{t0: {"1": }, t1:{"1": x}}""")
>>   .load()
>>   .coalesce(1)
>>   .writeStream
>>   .foreach(... code to write to cassandra ...)
>>
>> On Fri, Mar 17, 2017 at 7:35 AM, OUASSAIDI, Sami > > wrote:
>>
>>> @Cody : Duly noted.
>>> @Michael Ambrust : A repartition is out of the question for our project
>>> as it would be a fairly expensive operation. We tried looking into
>>> targeting a specific executor so as to avoid this extra cost and directly
>>> have well partitioned data after consuming the kafka topics. Also we are
>>> using Spark streaming to save to the cassandra DB and try to keep shuffle
>>> operations to a strict minimum (at best none). As of now we are not
>>> entirely pleased with our current performances, that's why I'm doing a
>>> kafka topic sharding POC and getting the executor to handle the specificied
>>> partitions is central.
>>> ᐧ
>>>
>>> 2017-03-17 9:14 GMT+01:00 Michael Armbrust :
>>>
 Sorry, typo.  Should be a repartition not a groupBy.


> spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "...")
>   .option("subscribe", "t0,t1")
>   .load()
>   .repartition($"partition")
>   .writeStream
>   .foreach(... code to write to cassandra ...)
>

>>>
>>>
>>> --
>>> *Mind7 Consulting*
>>>
>>> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
>>> __
>>>
>>> 64 Rue Taitbout, 75009 Paris
>>>
>>
>>


Re: Spark Streaming from Kafka, deal with initial heavy load.

2017-03-20 Thread Cody Koeninger
You want spark.streaming.kafka.maxRatePerPartition for the direct stream.

On Sat, Mar 18, 2017 at 3:37 PM, Mal Edwin  wrote:
>
> Hi,
> You can enable backpressure to handle this.
>
> spark.streaming.backpressure.enabled
> spark.streaming.receiver.maxRate
>
> Thanks,
> Edwin
>
> On Mar 18, 2017, 12:53 AM -0400, sagarcasual . ,
> wrote:
>
> Hi, we have spark 1.6.1 streaming from Kafka (0.10.1) topic using direct
> approach. The streaming part works fine but when we initially start the job,
> we have to deal with really huge Kafka message backlog, millions of
> messages, and that first batch runs for over 40 hours,  and after 12 hours
> or so it becomes very very slow, it keeps crunching messages, but at a very
> low speed. Any idea how to overcome this issue? Once the job is all caught
> up, subsequent batches are quick and fast since the load is really tiny to
> process. So any idea how to avoid this problem?
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Streaming 2.1.0 - window vs. batch duration

2017-03-17 Thread Cody Koeninger
Probably easier if you show some more code, but if you just call
dstream.window(Seconds(60))
you didn't specify a slide duration, so it's going to default to your
batch duration of 1 second.
So yeah, if you're just using e.g. foreachRDD to output every message
in the window, every second it's going to output the last 60 seconds
of messages... which does mean each message will be output a total of
60 times.

Using a smaller window of 5 seconds for an example, 1 message per
second, notice that message 1489765610 will be output a total of 5
times

Window:
1489765605
1489765606
1489765607
1489765608
1489765609
Window:
1489765606
1489765607
1489765608
1489765609
1489765610
Window:
1489765607
1489765608
1489765609
1489765610
1489765611
Window:
1489765608
1489765609
1489765610
1489765611
1489765612
Window:
1489765609
1489765610
1489765611
1489765612
1489765613
Window:
1489765610
1489765611
1489765612
1489765613
1489765614
Window:
1489765611
1489765612
1489765613
1489765614
1489765615

On Thu, Mar 16, 2017 at 2:34 PM, Dominik Safaric
 wrote:
> Hi all,
>
> As I’ve implemented a streaming application pulling data from Kafka every 1
> second (batch interval), I am observing some quite strange behaviour (didn’t
> use Spark extensively in the past, but continuous operator based engines
> instead of).
>
> Namely the dstream.window(Seconds(60)) windowed stream when written back to
> Kafka contains more messages then they were consumed (for debugging purposes
> using a small dataset of a million Kafka byte array deserialized messages).
> In particular, in total I’ve streamed exactly 1 million messages, whereas
> upon window expiry 60 million messages are written back to Kafka.
>
> I’ve read on the official docs that both the window and window slide
> duration must be multiples of the batch interval. Does this mean that when
> consuming messages between two windows every batch interval the RDDs of a
> given batch interval t the same batch is being ingested 59 more times into
> the windowed stream?
>
> If I would like to achieve this behaviour (batch every being equal to a
> second, window duration 60 seconds) - how might one achieve this?
>
> I would appreciate if anyone could correct me if I got the internals of
> Spark’s windowed operations wrong and elaborate a bit.
>
> Thanks,
> Dominik

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming+Kafka][How-to]

2017-03-16 Thread Cody Koeninger
Spark just really isn't a good fit for trying to pin particular computation
to a particular executor, especially if you're relying on that for
correctness.

On Thu, Mar 16, 2017 at 7:16 AM, OUASSAIDI, Sami 
wrote:

>
> Hi all,
>
> So I need to specify how an executor should consume data from a kafka
> topic.
>
> Let's say I have 2 topics : t0 and t1 with two partitions each, and two
> executors e0 and e1 (both can be on the same node so assign strategy does
> not work since in the case of a multi executor node it works based on round
> robin scheduling, whatever first available executor consumes the topic
> partition )
>
> What I would like to do is make e0 consume partition 0 from both t0 and t1
> while e1 consumes partition 1 from the t0 and t1. Is there no way around it
> except messing with scheduling ? If so what's the best approach.
>
> The reason for doing so is that executors will write to a cassandra
> database and since we will be in a parallelized context one executor might
> "collide" with another and therefore data will be lost, by assigning a
> partition I want to force the executor to process the data sequentially.
>
> Thanks
> Sami
> --
> *Mind7 Consulting*
>
> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
> __
>
> 64 Rue Taitbout, 75009 Paris
> ᐧ
>


Re: [Spark Kafka] API Doc pages for Kafka 0.10 not current

2017-02-28 Thread Cody Koeninger
The kafka-0-8 and kafka-0-10 integrations have conflicting
dependencies.  Last time I checked, Spark's doc publication puts
everything all in one classpath, so publishing them both together
won't work.  I thought there was already a Jira ticket related to
this, but a quick look didn't turn it up.

On Mon, Feb 27, 2017 at 3:01 PM, Afshartous, Nick
 wrote:
>
> Hello,
>
>
> Looks like the API docs linked from the Spark Kafka 0.10 Integration page
> are not current.
>
>
> For instance, on the page
>
>
>
> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
>
> the code examples show the new API (i.e. class ConsumerStrategies).
> However, following the links
>
>
> API Docs --> (Scala | Java)
>
>
> leads to API pages that do not have class ConsumerStrategies) .  The API doc
> package names  also have streaming.kafka as opposed to streaming.kafka10.
>
>
> --
>
> Nick

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

2017-02-22 Thread Cody Koeninger
If you're talking about the version of scala used to build the broker,
that shouldn't matter.
If you're talking about the version of scala used for the kafka client
dependency, it shouldn't have compiled at all to begin with.

On Wed, Feb 22, 2017 at 12:11 PM, Muhammad Haseeb Javed
<11besemja...@seecs.edu.pk> wrote:
> I just noticed that Spark version that I am using (2.0.2) is built with
> Scala 2.11. However I am using Kafka 0.8.2 built with Scala 2.10. Could this
> be the reason why we are getting this error?
>
> On Mon, Feb 20, 2017 at 5:50 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> So there's no reason to use checkpointing at all, right?  Eliminate
>> that as a possible source of problems.
>>
>> Probably unrelated, but this also isn't a very good way to benchmark.
>> Kafka producers are threadsafe, there's no reason to create one for
>> each partition.
>>
>> On Mon, Feb 20, 2017 at 4:43 PM, Muhammad Haseeb Javed
>> <11besemja...@seecs.edu.pk> wrote:
>> > This is the code that I have been trying is giving me this error. No
>> > complicated operation being performed on the topics as far as I can see.
>> >
>> > class Identity() extends BenchBase {
>> >
>> >
>> >   override def process(lines: DStream[(Long, String)], config:
>> > SparkBenchConfig): Unit = {
>> >
>> > val reportTopic = config.reporterTopic
>> >
>> > val brokerList = config.brokerList
>> >
>> >
>> > lines.foreachRDD(rdd => rdd.foreachPartition( partLines => {
>> >
>> >   val reporter = new KafkaReporter(reportTopic, brokerList)
>> >
>> >   partLines.foreach{ case (inTime , content) =>
>> >
>> > val outTime = System.currentTimeMillis()
>> >
>> > reporter.report(inTime, outTime)
>> >
>> > if(config.debugMode) {
>> >
>> >   println("Event: " + inTime + ", " + outTime)
>> >
>> > }
>> >
>> >   }
>> >
>> > }))
>> >
>> >   }
>> >
>> > }
>> >
>> >
>> > On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> That's an indication that the beginning offset for a given batch is
>> >> higher than the ending offset, i.e. something is seriously wrong.
>> >>
>> >> Are you doing anything at all odd with topics, i.e. deleting and
>> >> recreating them, using compacted topics, etc?
>> >>
>> >> Start off with a very basic stream over the same kafka topic that just
>> >> does foreach println or similar, with no checkpointing at all, and get
>> >> that working first.
>> >>
>> >> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
>> >> <11besemja...@seecs.edu.pk> wrote:
>> >> > Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
>> >> >
>> >> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
>> >> > <11besemja...@seecs.edu.pk> wrote:
>> >> >>
>> >> >> I am PhD student at Ohio State working on a study to evaluate
>> >> >> streaming
>> >> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel
>> >> >> HiBench
>> >> >> benchmarks. But I think I am having a problem  with Spark. I have
>> >> >> Spark
>> >> >> Streaming application which I am trying to run on a 5 node cluster
>> >> >> (including master). I have 2 zookeeper and 4 kafka brokers. However,
>> >> >> whenever I run a Spark Streaming application I encounter the
>> >> >> following
>> >> >> error:
>> >> >>
>> >> >> java.lang.IllegalArgumentException: requirement failed: numRecords
>> >> >> must
>> >> >> not be negative
>> >> >> at scala.Predef$.require(Predef.scala:224)
>> >> >> at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38)
>> >> >> at
>> >> >>
>> >> >>
>> >> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
>> >> >> at
>> >> &g

Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

2017-02-20 Thread Cody Koeninger
So there's no reason to use checkpointing at all, right?  Eliminate
that as a possible source of problems.

Probably unrelated, but this also isn't a very good way to benchmark.
Kafka producers are threadsafe, there's no reason to create one for
each partition.

On Mon, Feb 20, 2017 at 4:43 PM, Muhammad Haseeb Javed
<11besemja...@seecs.edu.pk> wrote:
> This is the code that I have been trying is giving me this error. No
> complicated operation being performed on the topics as far as I can see.
>
> class Identity() extends BenchBase {
>
>
>   override def process(lines: DStream[(Long, String)], config:
> SparkBenchConfig): Unit = {
>
> val reportTopic = config.reporterTopic
>
> val brokerList = config.brokerList
>
>
> lines.foreachRDD(rdd => rdd.foreachPartition( partLines => {
>
>   val reporter = new KafkaReporter(reportTopic, brokerList)
>
>   partLines.foreach{ case (inTime , content) =>
>
> val outTime = System.currentTimeMillis()
>
> reporter.report(inTime, outTime)
>
> if(config.debugMode) {
>
>   println("Event: " + inTime + ", " + outTime)
>
> }
>
>   }
>
> }))
>
>   }
>
> }
>
>
> On Mon, Feb 20, 2017 at 3:10 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> That's an indication that the beginning offset for a given batch is
>> higher than the ending offset, i.e. something is seriously wrong.
>>
>> Are you doing anything at all odd with topics, i.e. deleting and
>> recreating them, using compacted topics, etc?
>>
>> Start off with a very basic stream over the same kafka topic that just
>> does foreach println or similar, with no checkpointing at all, and get
>> that working first.
>>
>> On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
>> <11besemja...@seecs.edu.pk> wrote:
>> > Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
>> >
>> > On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
>> > <11besemja...@seecs.edu.pk> wrote:
>> >>
>> >> I am PhD student at Ohio State working on a study to evaluate streaming
>> >> frameworks (Spark Streaming, Storm, Flink) using the the Intel HiBench
>> >> benchmarks. But I think I am having a problem  with Spark. I have Spark
>> >> Streaming application which I am trying to run on a 5 node cluster
>> >> (including master). I have 2 zookeeper and 4 kafka brokers. However,
>> >> whenever I run a Spark Streaming application I encounter the following
>> >> error:
>> >>
>> >> java.lang.IllegalArgumentException: requirement failed: numRecords must
>> >> not be negative
>> >> at scala.Predef$.require(Predef.scala:224)
>> >> at
>> >>
>> >> org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38)
>> >> at
>> >>
>> >> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> >> at
>> >> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> >> at
>> >>
>> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> >> at scala.Option.orElse(Option.scala:289)
>> >>
>> >> The application starts fine, but as soon as the Kafka producers start
>> >> emitting the stream data I start receiving the aforementioned error
>> >> repea

Re: Why does Spark Streaming application with Kafka fail with “requirement failed: numRecords must not be negative”?

2017-02-20 Thread Cody Koeninger
That's an indication that the beginning offset for a given batch is
higher than the ending offset, i.e. something is seriously wrong.

Are you doing anything at all odd with topics, i.e. deleting and
recreating them, using compacted topics, etc?

Start off with a very basic stream over the same kafka topic that just
does foreach println or similar, with no checkpointing at all, and get
that working first.

On Mon, Feb 20, 2017 at 12:10 PM, Muhammad Haseeb Javed
<11besemja...@seecs.edu.pk> wrote:
> Update: I am using Spark 2.0.2 and  Kafka 0.8.2 with Scala 2.10
>
> On Mon, Feb 20, 2017 at 1:06 PM, Muhammad Haseeb Javed
> <11besemja...@seecs.edu.pk> wrote:
>>
>> I am PhD student at Ohio State working on a study to evaluate streaming
>> frameworks (Spark Streaming, Storm, Flink) using the the Intel HiBench
>> benchmarks. But I think I am having a problem  with Spark. I have Spark
>> Streaming application which I am trying to run on a 5 node cluster
>> (including master). I have 2 zookeeper and 4 kafka brokers. However,
>> whenever I run a Spark Streaming application I encounter the following
>> error:
>>
>> java.lang.IllegalArgumentException: requirement failed: numRecords must
>> not be negative
>> at scala.Predef$.require(Predef.scala:224)
>> at
>> org.apache.spark.streaming.scheduler.StreamInputInfo.(InputInfoTracker.scala:38)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>> at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>> at scala.Option.orElse(Option.scala:289)
>>
>> The application starts fine, but as soon as the Kafka producers start
>> emitting the stream data I start receiving the aforementioned error
>> repeatedly.
>>
>> I have tried removing Spark Streaming checkpointing files as has been
>> suggested in similar posts on the internet. However, the problem persists
>> even if I start a Kafka topic and its corresponding consumer Spark Streaming
>> application for the first time. Also the problem could not be offset related
>> as I start the topic for the first time.
>>
>> Although the application seems to be processing the stream properly as I
>> can see by the benchmark numbers generated. However, the numbers are way of
>> from what I got for Storm and Flink, suspecting me to believe that there is
>> something wrong with the pipeline and Spark is not able to process the
>> stream as cleanly as it should. Any help in this regard would be really
>> appreciated.
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread Cody Koeninger
Not sure what to tell you at that point - maybe compare what is
present in ZK to a known working group.

On Tue, Feb 14, 2017 at 9:06 PM, Mohammad Kargar <mkar...@phemi.com> wrote:
> Yes offset nodes are in zk and I can get the values.
>
> On Feb 14, 2017 6:54 PM, "Cody Koeninger" <c...@koeninger.org> wrote:
>>
>> Are there offset nodes in ZK?
>>
>> for a consumer group named mygroup, a topic named test, and partition
>> 0 you should be able to connect to ZK and do e.g.
>>
>> get /consumers/mygroup/offsets/test/0
>>
>> If those don't exist, those are the ZK nodes you would need to make
>> sure get created / updated from your spark job.
>>
>>
>>
>> On Tue, Feb 14, 2017 at 8:40 PM, Mohammad Kargar <mkar...@phemi.com>
>> wrote:
>> > I'm running 0.10 version and
>> >
>> > ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
>> >
>> >  lists the group.
>> >
>> > On Feb 14, 2017 6:34 PM, "Cody Koeninger" <c...@koeninger.org> wrote:
>> >>
>> >> As far as I know, kafka-consumer-groups.sh wasn't available with 0.8
>> >> versions of the kafka server.  Which version of the server are you
>> >> running?
>> >>
>> >> At any rate, the low-level consumer used by the
>> >> spark-streaming-kafka-0-8 integration isn't going to create a consumer
>> >> group or otherwise interact with them.  If
>> >>
>> >> ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
>> >>
>> >> doesn't show the group you're expecting, have you used a consumer that
>> >> would have created it?  Or did you explicitly try to create the
>> >> related ZK nodes yourself?
>> >>
>> >>
>> >> On Tue, Feb 14, 2017 at 8:07 PM, Mohammad Kargar <mkar...@phemi.com>
>> >> wrote:
>> >> > Tried
>> >> >
>> >> > ${kafka-home}/bin/kafka-consumer-groups.sh --zookeeper zhost:zp
>> >> > --describe
>> >> > --group mygroup
>> >> >
>> >> > and got
>> >> >
>> >> > No topic available for consumer group provided!
>> >> >
>> >> > Let me know if you still need the code (have to make changes before
>> >> > sharing).
>> >> >
>> >> > Thanks
>> >> >
>> >> >
>> >> > On Tue, Feb 14, 2017 at 5:57 PM, Cody Koeninger <c...@koeninger.org>
>> >> > wrote:
>> >> >>
>> >> >> Can you explain what wasn't successful and/or show code?
>> >> >>
>> >> >> On Tue, Feb 14, 2017 at 6:03 PM, Mohammad Kargar <mkar...@phemi.com>
>> >> >> wrote:
>> >> >> > As explained here, direct approach of integration between spark
>> >> >> > streaming
>> >> >> > and kafka does not update offsets in Zookeeper, hence
>> >> >> > Zookeeper-based
>> >> >> > Kafka
>> >> >> > monitoring tools will not show progress (details).  We followed
>> >> >> > the
>> >> >> > recommended workaround to update the zookeeper with the latest
>> >> >> > offset
>> >> >> > after
>> >> >> > each batch, but no success. Wondering if there's any end-to-end
>> >> >> > example
>> >> >> > we
>> >> >> > can use?
>> >> >> >
>> >> >> > Thanks,
>> >> >> > Mohammad
>> >> >> >
>> >> >
>> >> >

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread Cody Koeninger
Are there offset nodes in ZK?

for a consumer group named mygroup, a topic named test, and partition
0 you should be able to connect to ZK and do e.g.

get /consumers/mygroup/offsets/test/0

If those don't exist, those are the ZK nodes you would need to make
sure get created / updated from your spark job.



On Tue, Feb 14, 2017 at 8:40 PM, Mohammad Kargar <mkar...@phemi.com> wrote:
> I'm running 0.10 version and
>
> ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
>
>  lists the group.
>
> On Feb 14, 2017 6:34 PM, "Cody Koeninger" <c...@koeninger.org> wrote:
>>
>> As far as I know, kafka-consumer-groups.sh wasn't available with 0.8
>> versions of the kafka server.  Which version of the server are you
>> running?
>>
>> At any rate, the low-level consumer used by the
>> spark-streaming-kafka-0-8 integration isn't going to create a consumer
>> group or otherwise interact with them.  If
>>
>> ./bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
>>
>> doesn't show the group you're expecting, have you used a consumer that
>> would have created it?  Or did you explicitly try to create the
>> related ZK nodes yourself?
>>
>>
>> On Tue, Feb 14, 2017 at 8:07 PM, Mohammad Kargar <mkar...@phemi.com>
>> wrote:
>> > Tried
>> >
>> > ${kafka-home}/bin/kafka-consumer-groups.sh --zookeeper zhost:zp
>> > --describe
>> > --group mygroup
>> >
>> > and got
>> >
>> > No topic available for consumer group provided!
>> >
>> > Let me know if you still need the code (have to make changes before
>> > sharing).
>> >
>> > Thanks
>> >
>> >
>> > On Tue, Feb 14, 2017 at 5:57 PM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> Can you explain what wasn't successful and/or show code?
>> >>
>> >> On Tue, Feb 14, 2017 at 6:03 PM, Mohammad Kargar <mkar...@phemi.com>
>> >> wrote:
>> >> > As explained here, direct approach of integration between spark
>> >> > streaming
>> >> > and kafka does not update offsets in Zookeeper, hence Zookeeper-based
>> >> > Kafka
>> >> > monitoring tools will not show progress (details).  We followed the
>> >> > recommended workaround to update the zookeeper with the latest offset
>> >> > after
>> >> > each batch, but no success. Wondering if there's any end-to-end
>> >> > example
>> >> > we
>> >> > can use?
>> >> >
>> >> > Thanks,
>> >> > Mohammad
>> >> >
>> >
>> >

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Write JavaDStream to Kafka (how?)

2017-02-14 Thread Cody Koeninger
It looks like you're creating a kafka producer on the driver, and
attempting to write the string representation of
stringIntegerJavaPairRDD.  Instead, you probably want to be calling
stringIntegerJavaPairRDD.foreachPartition, so that producing to kafka
is being done on the executor.

Read

https://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

On Fri, Feb 10, 2017 at 4:08 AM, Gutwein, Sebastian
 wrote:
> Hi,
>
>
> I'am new to Spark-Streaming and want to run some end-to-end-tests with Spark
> and Kafka.
>
> My program is running but at the kafka topic nothing arrives. Can someone
> please help me?
>
> Where is my mistake, has someone a runnig example of writing a DStream to
> Kafka 0.10.1.0?
>
>
> The program looks like follows:
>
> import kafka.Kafka;
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerConfig;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaPairRDD;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.*;
> import org.apache.spark.rdd.RDD;
> import org.apache.spark.streaming.Duration;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.streaming.api.java.JavaPairDStream;
> import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import scala.Int;
> import scala.Tuple2;
>
> import java.util.*;
> import java.util.regex.Pattern;
>
> /**
>  * Consumes messages from one or more topics in Kafka and does wordcount.
>  *
>  * Usage: JavaKafkaWordCount
>  *is a list of one or more zookeeper servers that make quorum
>  *is the name of kafka consumer group
>  *is a list of one or more kafka topics to consume from
>  *is the number of threads the kafka consumer should use
>  *
>  * To run this example:
>  *   `$ bin/run-example
> org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
>  *zoo03 my-consumer-group topic1,topic2 1`
>  */
>
> public final class JavaKafkaWordCountTest {
>   private static final Pattern SPACE = Pattern.compile(" ");
>
>   private JavaKafkaWordCountTest() {
>   }
>
>   public static void main(String[] args) throws Exception {
> if (args.length < 4) {
>   System.err.println("Usage: JavaKafkaWordCount  
>  ");
>   System.exit(1);
> }
>
> SparkConf sparkConf = new
> SparkConf().setAppName("GutweinKafkaWordCount");
> // Create the context with 2 seconds batch size
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
> Duration(2000));
>
> int numThreads = Integer.parseInt(args[3]);
> Map topicMap = new HashMap<>();
> String[] topics = args[2].split(",");
> for (String topic: topics) {
>   topicMap.put(topic, numThreads);
> }
>
> final JavaPairReceiverInputDStream messages =
> KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
>
> JavaDStream lines = messages.map(new Function String>, String>() {
>   @Override
>   public String call(Tuple2 tuple2) {
> return tuple2._2();
>   }
> });
>
> JavaDStream words = lines.flatMap(new FlatMapFunction String>() {
>   @Override
>   public Iterator call(String x) {
> return Arrays.asList(SPACE.split(x)).iterator();
>   }
> });
>
> JavaPairDStream wordCounts = words.mapToPair(
>   new PairFunction() {
> @Override
> public Tuple2 call(String s) {
>   return new Tuple2<>(s, 1);
> }
>   }).reduceByKey(new Function2() {
> @Override
> public Integer call(Integer i1, Integer i2) {
>   return i1 + i2;
> }
>   });
>
> final KafkaWriter writer = new KafkaWriter("localhost:9081");
>
> wordCounts.foreachRDD(new VoidFunction>() {
> @Override
> public void call(JavaPairRDD
> stringIntegerJavaPairRDD) throws Exception {
> writer.writeToTopic("output",
> stringIntegerJavaPairRDD.toString());
> }
> });
>
> wordCounts.print();
> jssc.start();
> jssc.awaitTermination();
>   }
>
>   public static class KafkaWriter {
> Properties props = new Properties();
> KafkaProducer producer;
>
> // Constructor
> KafkaWriter(String bootstrap_server){
>   props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server);
>   props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.StringSerializer");
>   

Re: streaming-kafka-0-8-integration (direct approach) and monitoring

2017-02-14 Thread Cody Koeninger
Can you explain what wasn't successful and/or show code?

On Tue, Feb 14, 2017 at 6:03 PM, Mohammad Kargar  wrote:
> As explained here, direct approach of integration between spark streaming
> and kafka does not update offsets in Zookeeper, hence Zookeeper-based Kafka
> monitoring tools will not show progress (details).  We followed the
> recommended workaround to update the zookeeper with the latest offset after
> each batch, but no success. Wondering if there's any end-to-end example we
> can use?
>
> Thanks,
> Mohammad
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.0 Scala 2.11 and Kafka 0.10 Scala 2.10

2017-02-08 Thread Cody Koeninger
Pretty sure there was no 0.10.0.2 release of apache kafka.  If that's
a hortonworks modified version you may get better results asking in a
hortonworks specific forum.  Scala version of kafka shouldn't be
relevant either way though.

On Wed, Feb 8, 2017 at 5:30 PM, u...@moosheimer.com  wrote:
> Dear devs,
>
> is it possible to use Spark 2.0.2 Scala 2.11 and consume messages from kafka
> server 0.10.0.2 running on Scala 2.10?
> I tried this the last two days by using createDirectStream and can't get no
> message out of kafka?!
>
> I'm using HDP 2.5.3 running kafka_2.10-0.10.0.2.5.3.0-37 and Spark 2.0.2.
>
> Uwe
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SSpark streaming: Could not initialize class kafka.consumer.FetchRequestAndResponseStatsRegistry$

2017-02-06 Thread Cody Koeninger
You should not need to include jars for Kafka, the spark connectors
have the appropriate transitive dependency on the correct version.

On Sat, Feb 4, 2017 at 3:25 PM, Marco Mistroni  wrote:
> Hi
>  not sure if this will help at all, and pls take it with a pinch of salt as
> i dont have your setup and i am not running on a cluster
>
>  I have tried to run a kafka example which was originally workkign on spark
> 1.6.1 on spark 2.
> These are the jars i am using
>
> spark-streaming-kafka-0-10_2.11_2.0.1.jar
>
> kafka_2.11-0.10.1.1
>
>
> And here's the code up to the creation of the Direct Stream. apparently with
> the new version of kafka libs some properties have to be specified
>
>
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.storage.StorageLevel
>
> import java.util.regex.Pattern
> import java.util.regex.Matcher
>
> import Utilities._
>
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import kafka.serializer.StringDecoder
> import
> org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
> import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
>
> /** Working example of listening for log data from Kafka's testLogs topic on
> port 9092. */
> object KafkaExample {
>
>   def main(args: Array[String]) {
>
> // Create the context with a 1 second batch size
> val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))
>
> setupLogging()
>
> // Construct a regular expression (regex) to extract fields from raw
> Apache log lines
> val pattern = apacheLogPattern()
>
> val kafkaParams = Map("metadata.broker.list" -> "localhost:9092",
> "bootstrap.servers" -> "localhost:9092",
> "key.deserializer"
> ->"org.apache.kafka.common.serialization.StringDeserializer",
> "value.deserializer"
> ->"org.apache.kafka.common.serialization.StringDeserializer",
> "group.id" -> "group1")
> val topics = List("testLogs").toSet
> val lines = KafkaUtils.createDirectStream[String, String](
> ssc,
> PreferConsistent,
> Subscribe[String,
> String](topics, kafkaParams)
>   ).map(cr => cr.value())
>
> hth
>
>  marco
>
>
>
>
>
>
>
>
>
>
>
>
> On Sat, Feb 4, 2017 at 8:33 PM, Mich Talebzadeh 
> wrote:
>>
>> I am getting this error with Spark 2. which works with CDH 5.5.1 (Spark
>> 1.5).
>>
>> Admittedly I am messing around with Spark-shell. However, I am surprised
>> why this does not work with Spark 2 and is ok with CDH 5.1
>>
>> scala> val dstream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
>>
>> java.lang.NoClassDefFoundError: Could not initialize class
>> kafka.consumer.FetchRequestAndResponseStatsRegistry$
>>   at kafka.consumer.SimpleConsumer.(SimpleConsumer.scala:39)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.connect(KafkaCluster.scala:52)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:345)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers$1.apply(KafkaCluster.scala:342)
>>   at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>   at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.org$apache$spark$streaming$kafka$KafkaCluster$$withBrokers(KafkaCluster.scala:342)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:125)
>>   at
>> org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:112)
>>   at
>> org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
>>   at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
>>   ... 74 elided
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> Disclaimer: Use it at your own risk. Any and all responsibility for any
>> loss, damage or destruction of data or any other property which may arise
>> from relying on this email's technical content is explicitly disclaimed. The
>> author will in no case be liable for any monetary damages arising from such
>> loss, damage or destruction.
>>
>>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka dependencies in Eclipse project /Pls assist

2017-01-31 Thread Cody Koeninger
spark-streaming-kafka-0-10 has a transitive dependency on the kafka
library, you shouldn't need to include kafka explicitly.

What's your actual list of dependencies?

On Tue, Jan 31, 2017 at 3:49 PM, Marco Mistroni  wrote:
> HI all
>   i am trying to run a sample spark code which reads streaming data from
> Kafka
> I Have followed instructions here
>
> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
>
> Here's my setup
> Spark: 2.0.1
> Kafka:0.10.1.1
> Scala Version: 2.11
>
>
>
> Libraries used
> - spark-streaming-kafka-0.10_2.11-2.0.1
> - kafka-_2.11-0.10.0.1.jar
>
> These are my imports
>
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import
> org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
> import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
>
> But Eclipse is giving me the following errors:
> Missing or invlaid dependency detected while loading class file
> KafkaUtils.class. Could not access term clients in value org.apache.kafka
> because it (or its dependencies) are missing.
> Missing or invalid dependency detected while loading class file
> KafkaUtils.class Could not access term kafka in package org.apache because
> it (or its dependencies are missing)
> missing or invalid dependencies detected while loading class file
> KafkaUtils.class: could not access type ConsumerRecord in value
> org.apache.consumer because it(or its dependencies are missing)
>
> So it seems i have some dependencies clashing. Has any one encountered a
> similar error?
>
> kr
>  marco

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: mapWithState question

2017-01-30 Thread Cody Koeninger
Keep an eye on

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

although it'll likely be a while

On Mon, Jan 30, 2017 at 3:41 PM, Tathagata Das
 wrote:
> If you care about the semantics of those writes to Kafka, then you should be
> aware of two things.
> 1. There are no transactional writes to Kafka.
> 2. So, when tasks get reexecuted due to any failure, your mapping function
> will also be reexecuted, and the writes to kafka can happen multiple times.
> So you may only get at least once guarantee about those Kafka writes
>
>
> On Mon, Jan 30, 2017 at 10:02 AM, shyla deshpande 
> wrote:
>>
>> Hello,
>>
>> TD, your suggestion works great. Thanks
>>
>> I have 1 more question, I need to write to kafka from within the
>> mapWithState function. Just wanted to check if this a bad pattern in any
>> way.
>>
>> Thank you.
>>
>>
>>
>>
>>
>> On Sat, Jan 28, 2017 at 9:14 AM, shyla deshpande
>>  wrote:
>>>
>>> Thats a great idea. I will try that. Thanks.
>>>
>>> On Sat, Jan 28, 2017 at 2:35 AM, Tathagata Das
>>>  wrote:

 1 state object for each user.
 union both streams into a single DStream, and apply mapWithState on it
 to update the user state.

 On Sat, Jan 28, 2017 at 12:30 AM, shyla deshpande
  wrote:
>
> Can multiple DStreams manipulate a state? I have a stream that gives me
> total minutes the user spent on a course material. I have another stream
> that gives me chapters completed and lessons completed by the user. I want
> to keep track for each user total_minutes, chapters_completed and
> lessons_completed. I am not sure if I should have 1 state or 2 states. 
> Can I
> lookup the state for a given key just like a map outside the mapfunction?
>
> Appreciate your input. Thanks


>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
If you haven't looked at the offset ranges in the logs for the time period
in question, I'd start there.

On Jan 24, 2017 2:51 PM, "Hakan İlter" <hakanil...@gmail.com> wrote:

Sorry for misunderstanding. When I said that, I meant there are no lag in
consumer. Kafka Manager shows each consumer's coverage and lag status.

On Tue, Jan 24, 2017 at 10:45 PM, Cody Koeninger <c...@koeninger.org> wrote:

> When you said " I check the offset ranges from Kafka Manager and don't
> see any significant deltas.", what were you comparing it against?  The
> offset ranges printed in spark logs?
>
> On Tue, Jan 24, 2017 at 2:11 PM, Hakan İlter <hakanil...@gmail.com> wrote:
> > First of all, I can both see the "Input Rate" from Spark job's statistics
> > page and Kafka producer message/sec from Kafka manager. The numbers are
> > different when I have the problem. Normally these numbers are very near.
> >
> > Besides, the job is an ETL job, it writes the results to Elastic Search.
> An
> > another legacy app also writes the same results to a database. There are
> > huge difference between DB and ES. I know how many records we process
> daily.
> >
> > Everything works fine if I run a job instance for each topic.
> >
> > On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> I'm confused, if you don't see any difference between the offsets the
> >> job is processing and the offsets available in kafka, then how do you
> >> know it's processing less than all of the data?
> >>
> >> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter <hakanil...@gmail.com>
> >> wrote:
> >> > I'm using DirectStream as one stream for all topics. I check the
> offset
> >> > ranges from Kafka Manager and don't see any significant deltas.
> >> >
> >> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger <c...@koeninger.org>
> >> > wrote:
> >> >>
> >> >> Are you using receiver-based or direct stream?
> >> >>
> >> >> Are you doing 1 stream per topic, or 1 stream for all topics?
> >> >>
> >> >> If you're using the direct stream, the actual topics and offset
> ranges
> >> >> should be visible in the logs, so you should be able to see more
> >> >> detail about what's happening (e.g. all topics are still being
> >> >> processed but offsets are significantly behind, vs only certain
> topics
> >> >> being processed but keeping up with latest offsets)
> >> >>
> >> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com>
> >> >> wrote:
> >> >> > Hi everyone,
> >> >> >
> >> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data
> >> >> > from
> >> >> > multiple kafka topics. After starting the job, everything works
> fine
> >> >> > first
> >> >> > (like 700 req/sec) but after a while (couples of days or a week) it
> >> >> > starts
> >> >> > processing only some part of the data (like 350 req/sec). When I
> >> >> > check
> >> >> > the
> >> >> > kafka topics, I can see that there are still 700 req/sec coming to
> >> >> > the
> >> >> > topics. I don't see any errors, exceptions or any other problem.
> The
> >> >> > job
> >> >> > works fine when I start the same code with just single kafka topic.
> >> >> >
> >> >> > Do you have any idea or a clue to understand the problem?
> >> >> >
> >> >> > Thanks.
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > View this message in context:
> >> >> >
> >> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-st
> reaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> >> >> > Sent from the Apache Spark User List mailing list archive at
> >> >> > Nabble.com.
> >> >> >
> >> >> > 
> -
> >> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >> >
> >> >
> >> >
> >
> >
>


Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
When you said " I check the offset ranges from Kafka Manager and don't
see any significant deltas.", what were you comparing it against?  The
offset ranges printed in spark logs?

On Tue, Jan 24, 2017 at 2:11 PM, Hakan İlter <hakanil...@gmail.com> wrote:
> First of all, I can both see the "Input Rate" from Spark job's statistics
> page and Kafka producer message/sec from Kafka manager. The numbers are
> different when I have the problem. Normally these numbers are very near.
>
> Besides, the job is an ETL job, it writes the results to Elastic Search. An
> another legacy app also writes the same results to a database. There are
> huge difference between DB and ES. I know how many records we process daily.
>
> Everything works fine if I run a job instance for each topic.
>
> On Tue, Jan 24, 2017 at 5:26 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> I'm confused, if you don't see any difference between the offsets the
>> job is processing and the offsets available in kafka, then how do you
>> know it's processing less than all of the data?
>>
>> On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter <hakanil...@gmail.com>
>> wrote:
>> > I'm using DirectStream as one stream for all topics. I check the offset
>> > ranges from Kafka Manager and don't see any significant deltas.
>> >
>> > On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> Are you using receiver-based or direct stream?
>> >>
>> >> Are you doing 1 stream per topic, or 1 stream for all topics?
>> >>
>> >> If you're using the direct stream, the actual topics and offset ranges
>> >> should be visible in the logs, so you should be able to see more
>> >> detail about what's happening (e.g. all topics are still being
>> >> processed but offsets are significantly behind, vs only certain topics
>> >> being processed but keeping up with latest offsets)
>> >>
>> >> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com>
>> >> wrote:
>> >> > Hi everyone,
>> >> >
>> >> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data
>> >> > from
>> >> > multiple kafka topics. After starting the job, everything works fine
>> >> > first
>> >> > (like 700 req/sec) but after a while (couples of days or a week) it
>> >> > starts
>> >> > processing only some part of the data (like 350 req/sec). When I
>> >> > check
>> >> > the
>> >> > kafka topics, I can see that there are still 700 req/sec coming to
>> >> > the
>> >> > topics. I don't see any errors, exceptions or any other problem. The
>> >> > job
>> >> > works fine when I start the same code with just single kafka topic.
>> >> >
>> >> > Do you have any idea or a clue to understand the problem?
>> >> >
>> >> > Thanks.
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > View this message in context:
>> >> >
>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
>> >> > Sent from the Apache Spark User List mailing list archive at
>> >> > Nabble.com.
>> >> >
>> >> > -
>> >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-24 Thread Cody Koeninger
I'm confused, if you don't see any difference between the offsets the
job is processing and the offsets available in kafka, then how do you
know it's processing less than all of the data?

On Tue, Jan 24, 2017 at 12:35 AM, Hakan İlter <hakanil...@gmail.com> wrote:
> I'm using DirectStream as one stream for all topics. I check the offset
> ranges from Kafka Manager and don't see any significant deltas.
>
> On Tue, Jan 24, 2017 at 4:42 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Are you using receiver-based or direct stream?
>>
>> Are you doing 1 stream per topic, or 1 stream for all topics?
>>
>> If you're using the direct stream, the actual topics and offset ranges
>> should be visible in the logs, so you should be able to see more
>> detail about what's happening (e.g. all topics are still being
>> processed but offsets are significantly behind, vs only certain topics
>> being processed but keeping up with latest offsets)
>>
>> On Mon, Jan 23, 2017 at 3:14 PM, hakanilter <hakanil...@gmail.com> wrote:
>> > Hi everyone,
>> >
>> > I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
>> > multiple kafka topics. After starting the job, everything works fine
>> > first
>> > (like 700 req/sec) but after a while (couples of days or a week) it
>> > starts
>> > processing only some part of the data (like 350 req/sec). When I check
>> > the
>> > kafka topics, I can see that there are still 700 req/sec coming to the
>> > topics. I don't see any errors, exceptions or any other problem. The job
>> > works fine when I start the same code with just single kafka topic.
>> >
>> > Do you have any idea or a clue to understand the problem?
>> >
>> > Thanks.
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Failure handling

2017-01-24 Thread Cody Koeninger
Can you identify the error case and call System.exit ?  It'll get
retried on another executor, but as long as that one fails the same
way...

If you can identify the error case at the time you're doing database
interaction and just prevent data being written then, that's what I
typically do.

On Tue, Jan 24, 2017 at 7:50 AM, Erwan ALLAIN  wrote:
> Hello guys,
>
> I have a question regarding how spark handle failure.
>
> I’m using kafka direct stream
> Spark 2.0.2
> Kafka 0.10.0.1
>
> Here is a snippet of code
>
> val stream = createDirectStream(….)
>
> stream
>  .map(…)
> .forEachRDD( doSomething)
>
> stream
> .map(…)
> .forEachRDD( doSomethingElse)
>
> The execution is in FIFO, so the first action ends after the second starts
> so far so good.
> However, I would like that when an error (fatal or not) occurs during the
> execution of the first action, the streaming context is stopped immediately.
> It's like the driver is not notified of the exception and launch the second
> action.
>
> In our case, the second action is performing checkpointing in an external
> database and we do not want to checkpoint if an error occurs before.
> We do not want to rely on spark checkpoint as it causes issue when upgrading
> application.
>
> Let me know if it’s not clear !
>
> Thanks !
>
> Erwan

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming multiple kafka topic doesn't work at-least-once

2017-01-23 Thread Cody Koeninger
Are you using receiver-based or direct stream?

Are you doing 1 stream per topic, or 1 stream for all topics?

If you're using the direct stream, the actual topics and offset ranges
should be visible in the logs, so you should be able to see more
detail about what's happening (e.g. all topics are still being
processed but offsets are significantly behind, vs only certain topics
being processed but keeping up with latest offsets)

On Mon, Jan 23, 2017 at 3:14 PM, hakanilter  wrote:
> Hi everyone,
>
> I have a spark (1.6.0-cdh5.7.1) streaming job which receives data from
> multiple kafka topics. After starting the job, everything works fine first
> (like 700 req/sec) but after a while (couples of days or a week) it starts
> processing only some part of the data (like 350 req/sec). When I check the
> kafka topics, I can see that there are still 700 req/sec coming to the
> topics. I don't see any errors, exceptions or any other problem. The job
> works fine when I start the same code with just single kafka topic.
>
> Do you have any idea or a clue to understand the problem?
>
> Thanks.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-multiple-kafka-topic-doesn-t-work-at-least-once-tp28334.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Assembly for Kafka >= 0.10.0, Spark 2.2.0, Scala 2.11

2017-01-18 Thread Cody Koeninger
Spark 2.2 hasn't been released yet, has it?

Python support in kafka dstreams for 0.10 is probably never, there's a
jira ticket about this.

Stable, hard to say.  It was quite a few releases before 0.8 was
marked stable, even though it underwent little change.

On Wed, Jan 18, 2017 at 2:21 AM, Karamba  wrote:
> |Hi, I am looking for an assembly for Spark 2.2.0 with Scala 2.11. I
> can't find one in MVN Repository. Moreover, "org.apache.spark" %%
> "spark-streaming-kafka-0-10_2.11" % "2.1.0 shows that even sbt does not
> find one: [error] (*:update) sbt.ResolveException: unresolved
> dependency: org.apache.spark#spark-streaming-kafka-0-10_2.11_2.11;2.1.0:
> not found Where do I find that a library? Thanks and best regards,
> karamba PS: Does anybody know when python support becomes available in
> spark-streaming-kafka-0-10 and when it will reach "stable"? |
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka 0.8 + Spark 2.0 Partition Issue

2017-01-06 Thread Cody Koeninger
Kafka is designed to only allow reads from leaders.  You need to fix
this at the kafka level not the spark level.

On Fri, Jan 6, 2017 at 7:33 AM, Raghu Vadapalli  wrote:
>
> My spark 2.0 +  kafka 0.8 streaming job fails with error partition leaderset
> exception. When I check the kafka topic the partition, it is indeed in error
> with Leader = -1 and empty ISR.  I did lot of google and all of them point
> to either restarting or deleting the topic.  To do any of those two in
> production system while other topics are in heavy use is next to impossible.
> Now my question, is there way to force spark to read from leaderless
> partition accepting some dataloss or inconsistency ? Or force the immediate
> sync followed by election ( a kafka users group question but I am pushing my
> luck:) here )
>
> Topic: vzecsapplog Partition: 8 Leader: -1 Replicas: 5,4 Isr:
>
> --
> Regards,
> Raghu
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Kafka] How to update batch size of input dynamically for spark kafka consumer?

2017-01-03 Thread Cody Koeninger
You can't change the batch time, but you can limit the number of items
in the batch

http://spark.apache.org/docs/latest/configuration.html

spark.streaming.backpressure.enabled

spark.streaming.kafka.maxRatePerPartition

On Tue, Jan 3, 2017 at 4:00 AM, 周家帅  wrote:
> Hi,
>
> I am an intermediate spark user and have some experience in large data
> processing. I post this question in StackOverflow but receive no response.
> My problem is as follows:
>
> I use createDirectStream in my spark streaming application. I set the batch
> interval to 7 seconds and most of the time the batch job can finish within
> about 5 seconds. However, in very rare cases, the batch job need cost 60
> seconds and this will delay some batches of jobs. To cut down the total
> delay time for these batches, I hope I can process more streaming data which
> spread over the delayed jobs at one time. This will help the streaming
> return to normal as soon as possible.
>
> So, I want to know there is some method to dynamically update/merge batch
> size of input for spark and kafka when delay appears.
>
> Many thanks for your help.
>
>
> --
> Jiashuai Zhou
>
> School of Electronics Engineering and Computer Science,
> Peking University
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can't access the data in Kafka Spark Streaming globally

2016-12-23 Thread Cody Koeninger
This doesn't sound like a question regarding Kafka streaming, it
sounds like confusion about the scope of variables in spark generally.
Is that right?  If so, I'd suggest reading the documentation, starting
with a simple rdd (e.g. using sparkContext.parallelize), and
experimenting to confirm your understanding.

On Thu, Dec 22, 2016 at 11:46 PM, Sree Eedupuganti  wrote:
> I am trying to stream the data from Kafka to Spark.
>
> JavaPairInputDStream directKafkaStream =
> KafkaUtils.createDirectStream(ssc,
> String.class,
> String.class,
> StringDecoder.class,
> StringDecoder.class,
> kafkaParams, topics);
>
> Here i am iterating over the JavaPairInputDStream to process the RDD's.
>
> directKafkaStream.foreachRDD(rdd ->{
> rdd.foreachPartition(items ->{
> while (items.hasNext()) {
> String[] State = items.next()._2.split("\\,");
>
> System.out.println(State[2]+","+State[3]+","+State[4]+"--");
> };
> });
> });
>
>
> In this i can able to access the String Array but when i am trying to access
> the String Array data globally i can't access the data. Here my requirement
> is if i had access these data globally i had another lookup table in Hive.
> So i am trying to perform an operation on these. Any suggestions please,
> Thanks.
>
>
> --
> Best Regards,
> Sreeharsha Eedupuganti

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Why foreachPartition function make duplicate invocation to map function for every message ? (Spark 2.0.2)

2016-12-16 Thread Cody Koeninger
Please post a minimal complete code example of what you are talking about

On Thu, Dec 15, 2016 at 6:00 PM, Michael Nguyen
 wrote:
> I have the following sequence of Spark Java API calls (Spark 2.0.2):
>
> Kafka stream that is processed via a map function, which returns the string
> value from tuple2._2() for JavaDStream as in
>
> return tuple2._2();
>
> The returned JavaDStream is then processed by foreachPartition, which is
> wrapped by foreachRDD.
>
> foreachPartition's call function does Iterator on the RDD as in
> inputRDD.next ();
>
> When data is received, step 1 is executed, which is correct. However,
> inputRDD.next () in step 3 makes a duplicate call to the map function in
> step 1. So that map function is called twice for every message:
>
> -  the first time when the message is received from the Kafka stream, and
>
> - the second time when Iterator inputParams.next () is invoked from
> foreachPartition's call function.
>
> I also tried transforming the data in the map function as in
>
> public TestTransformedClass call(Tuple2  tuple2) for step 1
>
> public void call(Iterator  inputParams) for step 3
>
> and the same issue occurs. So this issue occurs, no matter whether this
> sequence of Spark API calls involves data transformation or not.
>
> Questions:
>
> Since the message was already processed in step 1, why does inputRDD.next ()
> in step 3 makes a duplicate call to the map function in step 1 ?
>
> How do I fix it to avoid duplicate invocation for every message ?
>
> Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2 or Spark 1.6.x?

2016-12-12 Thread Cody Koeninger
You certainly can use stable version of Kafka brokers with spark
2.0.2, why would you think otherwise?

On Mon, Dec 12, 2016 at 8:53 AM, Amir Rahnama  wrote:
> Hi,
>
> You need to describe more.
>
> For example, in Spark 2.0.2, you can't use stable versions of Apache Kafka.
>
> In general, I would say start with 2.0.2-
>
> On Mon, Dec 12, 2016 at 7:34 AM, Lohith Samaga M 
> wrote:
>>
>> Hi,
>>
>> I am new to Spark. I would like to learn Spark.
>>
>> I think I should learn version 2.0.2.
>>
>> Or should I still go for version 1.6.x and then come to version 2.0.2?
>>
>>
>>
>> Please advise.
>>
>>
>>
>> Thanks in advance.
>>
>>
>>
>> Best regards / Mit freundlichen Grüßen / Sincères salutations
>>
>> M. Lohith Samaga
>>
>>
>> Information transmitted by this e-mail is proprietary to Mphasis, its
>> associated companies and/ or its customers and is intended
>> for use only by the individual or entity to which it is addressed, and may
>> contain information that is privileged, confidential or
>> exempt from disclosure under applicable law. If you are not the intended
>> recipient or it appears that this mail has been forwarded
>> to you without proper authority, you are notified that any use or
>> dissemination of this information in any manner is strictly
>> prohibited. In such cases, please notify us immediately at
>> mailmas...@mphasis.com and delete this mail from your records.
>
>
>
>
> --
> Thanks and Regards,
>
> Amir Hossein Rahnama
>
> Tel: +46 (0) 729 785 012
> Website: www.ambodi.com
> Twitter: @_ambodi

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming with Kafka

2016-12-12 Thread Cody Koeninger
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#creating-a-direct-stream

Use a separate group id for each stream, like the docs say.

If you're doing multiple output operations, and aren't caching, spark
is going to read from kafka again each time, and if some of those
reads are happening for the same group and same topicpartition, it's
not going to work.

On Sun, Dec 11, 2016 at 2:36 PM, Oleksii Dukhno
 wrote:
> Hi Anton,
>
> What is the command you run your spark app with? Why not working with data
> instead of stream on your second stage operation? Can you provide logs with
> the issue?
>
> ConcurrentModificationException is not a spark issue, it means that you use
> the same Kafka consumer instance from more than one thread.
>
> Additionally,
>
> 1) As I understand new kafka consumer is created every time when you call
> KafkaUtils.createDirectStream.
> 2) If you assign the same group id to several consumer instances then all
> the consumers will get different set of messages on the same topic. This is
> a kind of load balancing which kafka provides with its Consumer API.
>
> Oleksii
>
> On 11 December 2016 at 18:46, Anton Okolnychyi 
> wrote:
>>
>> sorry, I forgot to mention that I was using Spark 2.0.2, Kafka 0.10, and
>> nothing custom.
>>
>>
>> I will try restate the initial question. Let's consider an example.
>>
>> 1. I create a stream and subscribe to a certain topic.
>>
>> val stream = KafkaUtils.createDirectStream(...)
>>
>> 2. I extract the actual data from the stream. For instance, word counts.
>>
>> val wordCounts = stream.map(record => (record.value(), 1))
>>
>> 3. Then I compute something and output the result to console.
>>
>> val firstResult = stream.reduceByWindow(...)
>> firstResult.print()
>>
>> Once that is done, I would like to perform another computation on top of
>> wordCounts and output that result again to console. In my current
>> understanding, I cannot just reuse wordCounts from Step 2 and should create
>> a new stream with another group id and then define the second computation.
>> Am I correct that if add the next part, then I can get
>> "ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access"?
>>
>> // another computation on wordCounts
>> val secondResult = wordCounts.reduceByKeyAndWindow(...)
>> secondResult.output()
>>
>> Thanks,
>> Anton
>>
>> 2016-12-11 17:11 GMT+01:00 Timur Shenkao :
>>>
>>> Hi,
>>> Usual general questions are:
>>> -- what is your Spark version?
>>> -- what is your Kafka version?
>>> -- do you use "standard" Kafka consumer or try to implement something
>>> custom (your own multi-threaded consumer)?
>>>
>>> The freshest docs
>>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>>
>>> AFAIK, yes, you should use unique group id for each stream (KAFKA 0.10
>>> !!!)

 kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>>>
>>>
>>>
>>> On Sun, Dec 11, 2016 at 5:51 PM, Anton Okolnychyi
>>>  wrote:

 Hi,

 I am experimenting with Spark Streaming and Kafka. I will appreciate if
 someone can say whether the following assumption is correct.

 If I have multiple computations (each with its own output) on one stream
 (created as KafkaUtils.createDirectStream), then there is a chance to have
 ConcurrentModificationException: KafkaConsumer is not safe for
 multi-threaded access.  To solve this problem, I should create a new stream
 with different "group.id" for each computation.

 Am I right?

 Best regards,
 Anton
>>>
>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: problem with kafka createDirectStream ..

2016-12-09 Thread Cody Koeninger
I'd say unzip your actual assembly jar and verify whether the kafka
consumer classes are 0.10.1 or 0.10.0.  We've seen reports of odd
behavior with 0.10.1 classes.  Possibly unrelated, but good to
eliminate.

On Fri, Dec 9, 2016 at 10:38 AM, Debasish Ghosh
<ghosh.debas...@gmail.com> wrote:
> oops .. it's 0.10.0 .. sorry for the confusion ..
>
> On Fri, Dec 9, 2016 at 10:07 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>>
>> My assembly contains the 0.10.1 classes .. Here are the dependencies
>> related to kafka & spark that my assembly has ..
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.kafka"  %   "kafka-streams"  %
>> "0.10.0.0",
>>   "org.apache.spark" %%   "spark-streaming-kafka-0-10" % spark,
>>   "org.apache.spark" %%   "spark-core" % spark %
>> "provided",
>>   "org.apache.spark" %%   "spark-streaming"% spark %
>> "provided",
>>   "org.apache.spark" %%   "spark-mllib"% spark %
>> "provided",
>>   "org.apache.spark" %%   "spark-sql"  % spark %
>> "provided"
>> )
>>
>> regards.
>>
>> On Fri, Dec 9, 2016 at 10:00 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>>
>>> When you say 0.10.1 do you mean broker version only, or does your
>>> assembly contain classes from the 0.10.1 kafka consumer?
>>>
>>> On Fri, Dec 9, 2016 at 10:19 AM, debasishg <ghosh.debas...@gmail.com>
>>> wrote:
>>> > Hello -
>>> >
>>> > I am facing some issues with the following snippet of code that reads
>>> > from
>>> > Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..)
>>> > with
>>> > Kafka 0.10.1 and Spark 2.0.1.
>>> >
>>> > // get the data from kafka
>>> > val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>>> >   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
>>> > streamingContext,
>>> > PreferConsistent,
>>> > Subscribe[Array[Byte], (String, String)](topicToReadFrom,
>>> > kafkaParams)
>>> >   )
>>> >
>>> > // label and vectorize the value
>>> > val projected: DStream[(String, Vector)] = stream.map { record =>
>>> >   val (label, value) = record.value
>>> >   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>>> >   (label, vector)
>>> > }.transform(projectToLowerDimension)
>>> >
>>> > In the above snippet if I have the call to transform in the last line,
>>> > I get
>>> > the following exception ..
>>> >
>>> > Caused by: java.util.ConcurrentModificationException: KafkaConsumer is
>>> > not
>>> > safe for multi-threaded access
>>> > at
>>> >
>>> > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>>> > at
>>> >
>>> > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
>>> > at
>>> >
>>> > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
>>> > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>>> > at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
>>> > at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>> > at
>>> >
>>> > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>>> > at
>>> >
>>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>>> > at
>>> >
>>> > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>>> > at

Re: problem with kafka createDirectStream ..

2016-12-09 Thread Cody Koeninger
When you say 0.10.1 do you mean broker version only, or does your
assembly contain classes from the 0.10.1 kafka consumer?

On Fri, Dec 9, 2016 at 10:19 AM, debasishg  wrote:
> Hello -
>
> I am facing some issues with the following snippet of code that reads from
> Kafka and creates DStream. I am using KafkaUtils.createDirectStream(..) with
> Kafka 0.10.1 and Spark 2.0.1.
>
> // get the data from kafka
> val stream: DStream[ConsumerRecord[Array[Byte], (String, String)]] =
>   KafkaUtils.createDirectStream[Array[Byte], (String, String)](
> streamingContext,
> PreferConsistent,
> Subscribe[Array[Byte], (String, String)](topicToReadFrom, kafkaParams)
>   )
>
> // label and vectorize the value
> val projected: DStream[(String, Vector)] = stream.map { record =>
>   val (label, value) = record.value
>   val vector = Vectors.dense(value.split(",").map(_.toDouble))
>   (label, vector)
> }.transform(projectToLowerDimension)
>
> In the above snippet if I have the call to transform in the last line, I get
> the following exception ..
>
> Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not
> safe for multi-threaded access
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
> at
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
> at scala.collection.AbstractIterator.to(Iterator.scala:1336)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
> 
>
> The transform method does a PCA and gives the top 2 principal components ..
>
> private def projectToLowerDimension: RDD[(String, Vector)] => RDD[(String,
> Vector)] = { rdd =>
>   if (rdd.isEmpty) rdd else {
> // reduce to 2 dimensions
> val pca = new PCA(2).fit(rdd.map(_._2))
>
> // Project vectors to the linear space spanned by the top 2 principal
> // components, keeping the label
> rdd.map(p => (p._1, pca.transform(p._2)))
>   }
> }
>
> However if I remove the transform call, I can process everything correctly.
>
> Any help will be most welcome ..
>
> regards.
> - Debasish
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/problem-with-kafka-createDirectStream-tp28190.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Reprocessing failed jobs in Streaming job

2016-12-07 Thread Cody Koeninger
If your operations are idempotent, you should be able to just run a
totally separate job that looks for failed batches and does a kafkaRDD
to reprocess that batch.  C* probably isn't the first choice for what
is essentially a queue, but if the frequency of batches is relatively
low it probably doesn't matter.

That is indeed a weird stacktrace, did you investigate driver logs to
see if there was something else preceding it?

On Wed, Dec 7, 2016 at 2:41 PM, map reduced <k3t.gi...@gmail.com> wrote:
>> Personally I think forcing the stream to fail (e.g. check offsets in
>> downstream store and throw exception if they aren't as expected) is
>> the safest thing to do.
>
>
> I would think so too, but just for say 2-3 (sometimes just 1) failed batches
> in a whole day, I am trying to not kill the whole processing and restart.
>
> I am storing the offsets per batch and success/failure in a separate C*
> table - checkpointing was not an option due to it not working with
> application jar change etc.  Since I have access to the offsets, you think
> #2 or some variation of it may work?
>
> Btw, some of those failures I mentioned are strange, for instance (Spark
> 2.0.0 and spark-streaming-kafka-0-8_2.11):
>
> Job aborted due to stage failure: Task 173 in stage 92312.0 failed 10 times,
> most recent failure: Lost task 173.9 in stage 92312.0 (TID 27689025,
> 17.162.114.161): java.util.NoSuchElementException
>   at
> java.util.concurrent.ConcurrentSkipListMap.firstKey(ConcurrentSkipListMap.java:2036)
>   at
> com.yammer.metrics.stats.ExponentiallyDecayingSample.update(ExponentiallyDecayingSample.java:102)
>   at
> com.yammer.metrics.stats.ExponentiallyDecayingSample.update(ExponentiallyDecayingSample.java:81)
>   at com.yammer.metrics.core.Histogram.update(Histogram.java:110)
>   at com.yammer.metrics.core.Timer.update(Timer.java:198)
>   at com.yammer.metrics.core.Timer.update(Timer.java:76)
>   at com.yammer.metrics.core.TimerContext.stop(TimerContext.java:31)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:36)
>   at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
>   at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
>   at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
>   at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:209)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>   at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
>   at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>   at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>
>
> On Wed, Dec 7, 2016 at 12:16 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Personally I think forcing the stream to fail (e.g. check offsets in
>> downstream store and throw exception if they aren't as expected) is
>> the safest thing to do.
>>
>> If you proceed after a failure, you need a place to reliably record
>> the batches that failed for later processing.
>>
>> On Wed, Dec 7, 2016 at 1:46 PM, map reduced <k3t.gi...@gmail.com> wrote:
>> > Hi,
>> >
>> > I am trying to solve this problem - in my streaming flow, every day few
>> > jobs
>> > fail due to some (say kafka cluster maintenance etc, mostly unavoidable)
>> > reasons for few batches and resumes back to success.
>> > I want to reprocess those failed jobs programmatically (assume I have a
>> > way
>> > of getting start-end offsets for kafka topics for failed jobs). I was
>> > thinking of these options:
>> > 1) Somehow pause streaming job when it detects failing jobs - this seems
>> > not
>> > possible.
>> > 2) From driver - run additional proce

Re: Reprocessing failed jobs in Streaming job

2016-12-07 Thread Cody Koeninger
Personally I think forcing the stream to fail (e.g. check offsets in
downstream store and throw exception if they aren't as expected) is
the safest thing to do.

If you proceed after a failure, you need a place to reliably record
the batches that failed for later processing.

On Wed, Dec 7, 2016 at 1:46 PM, map reduced  wrote:
> Hi,
>
> I am trying to solve this problem - in my streaming flow, every day few jobs
> fail due to some (say kafka cluster maintenance etc, mostly unavoidable)
> reasons for few batches and resumes back to success.
> I want to reprocess those failed jobs programmatically (assume I have a way
> of getting start-end offsets for kafka topics for failed jobs). I was
> thinking of these options:
> 1) Somehow pause streaming job when it detects failing jobs - this seems not
> possible.
> 2) From driver - run additional processing to check every few minutes using
> driver rest api (/api/v1/applications...) what jobs have failed and submit
> batch jobs for those failed jobs
>
> 1 - doesn't seem to be possible, and I don't want to kill streaming context
> just for few failing batches to stop the job for some time and resume after
> few minutes.
> 2 - seems like a viable option, but a little complicated, since even the
> batch job can fail due to whatever reasons and I am back to tracking that
> separately etc.
>
> Does anyone has faced this issue or have any suggestions?
>
> Thanks,
> KP

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - join streaming and static data

2016-12-06 Thread Cody Koeninger
You do not need recent versions of spark, kafka, or structured
streaming in order to do this.  Normal DStreams are sufficient.

You can parallelize your static data from the database to an RDD, and
there's a join method available on RDDs.  Transforming a single given
timestamp line into multiple lines with modified timestamps can be
done using flatMap.

On Tue, Dec 6, 2016 at 11:11 AM, Burak Yavuz  wrote:
> Hi Daniela,
>
> This is trivial with Structured Streaming. If your Kafka cluster is 0.10.0
> or above, you may use Spark 2.0.2 to create a Streaming DataFrame from
> Kafka, and then also create a DataFrame using the JDBC connection, and you
> may join those. In Spark 2.1, there's support for a function called
> "from_json", which should also help you easily parse your messages incoming
> from Kafka.
>
> Best,
> Burak
>
> On Tue, Dec 6, 2016 at 2:16 AM, Daniela S  wrote:
>>
>> Hi
>>
>> I have some questions regarding Spark Streaming.
>>
>> I receive a stream of JSON messages from Kafka.
>> The messages consist of a timestamp and an ID.
>>
>> timestamp ID
>> 2016-12-06 13:001
>> 2016-12-06 13:405
>> ...
>>
>> In a database I have values for each ID:
>>
>> ID   minute  value
>> 1 0   3
>> 1 1   5
>> 1 2   7
>> 1 3   8
>> 5 0   6
>> 5 1   6
>> 5 2   8
>> 5 3   5
>> 5 4   6
>>
>> So I would like to join each incoming JSON message with the corresponding
>> values. It should look as follows:
>>
>> timestamp ID   minute  value
>> 2016-12-06 13:001 0   3
>> 2016-12-06 13:001 1   5
>> 2016-12-06 13:001 2   7
>> 2016-12-06 13:001 3   8
>> 2016-12-06 13:405 0   6
>> 2016-12-06 13:405 1   6
>> 2016-12-06 13:405 2   8
>> 2016-12-06 13:405 3   5
>> 2016-12-06 13:405 4   6
>> ...
>>
>> Then I would like to add the minute values to the timestamp. I only need
>> the computed timestamp and the values. So the result should look as follows:
>>
>> timestamp   value
>> 2016-12-06 13:00  3
>> 2016-12-06 13:01  5
>> 2016-12-06 13:02  7
>> 2016-12-06 13:03  8
>> 2016-12-06 13:40  6
>> 2016-12-06 13:41  6
>> 2016-12-06 13:42  8
>> 2016-12-06 13:43  5
>> 2016-12-06 13:44  6
>> ...
>>
>> Is this a possible use case for Spark Streaming? I thought I could join
>> the streaming data with the static data but I am not sure how to add the
>> minute values to the timestamp. Is this possible with Spark Streaming?
>>
>> Thank you in advance.
>>
>> Best regards,
>> Daniela
>>
>> - To
>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can spark support exactly once based kafka ? Due to these following question?

2016-12-05 Thread Cody Koeninger
Have you read / watched the materials linked from
https://github.com/koeninger/kafka-exactly-once

On Mon, Dec 5, 2016 at 4:17 AM, Jörn Franke  wrote:
> You need to do the book keeping of what has been processed yourself. This
> may mean roughly the following (of course the devil is in the details):
> Write down in zookeeper which part of the processing job has been done and
> for which dataset all the data has been created (do not keep the data itself
> in zookeeper).
> Once you start a processing job, check in zookeeper if it has been
> processed, if not remove all staging data, if yes terminate.
>
> As I said the details depend on your job and require some careful thinking,
> but exactly once can be achieved with Spark (and potentially zookeeper or
> similar, such as Redis).
> Of course at the same time think if you need delivery in order etc.
>
> On 5 Dec 2016, at 08:59, Michal Šenkýř  wrote:
>
> Hello John,
>
> 1. If a task complete the operation, it will notify driver. The driver may
> not receive the message due to the network, and think the task is still
> running. Then the child stage won't be scheduled ?
>
> Spark's fault tolerance policy is, if there is a problem in processing a
> task or an executor is lost, run the task (and any dependent tasks) again.
> Spark attempts to minimize the number of tasks it has to recompute, so
> usually only a small part of the data is recomputed.
>
> So in your case, the driver simply schedules the task on another executor
> and continues to the next stage when it receives the data.
>
> 2. how do spark guarantee the downstream-task can receive the shuffle-data
> completely. As fact, I can't find the checksum for blocks in spark. For
> example, the upstream-task may shuffle 100Mb data, but the downstream-task
> may receive 99Mb data due to network. Can spark verify the data is received
> completely based size ?
>
> Spark uses compression with checksuming for shuffle data so it should know
> when the data is corrupt and initiate a recomputation.
>
> As for your question in the subject:
> All of this means that Spark supports at-least-once processing. There is no
> way that I know of to ensure exactly-once. You can try to minimize
> more-than-once situations by updating your offsets as soon as possible but
> that does not eliminate the problem entirely.
>
> Hope this helps,
>
> Michal Senkyr

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Back-pressure to Spark Kafka Streaming?

2016-12-05 Thread Cody Koeninger
If you want finer-grained max rate setting, SPARK-17510 got merged a
while ago.  There's also SPARK-18580 which might help address the
issue of starting backpressure rate for the first batch.

On Mon, Dec 5, 2016 at 4:18 PM, Liren Ding  wrote:
> Hey all,
>
> Does backressure actually work on spark kafka streaming? According to the
> latest spark streaming document:
> http://spark.apache.org/docs/latest/streaming-programming-guide.html
> "In Spark 1.5, we have introduced a feature called backpressure that
> eliminate the need to set this rate limit, as Spark Streaming automatically
> figures out the rate limits and dynamically adjusts them if the processing
> conditions change. This backpressure can be enabled by setting the
> configuration parameter spark.streaming.backpressure.enabled to true."
> But I also see a few open spark jira tickets on this option:
> https://issues.apache.org/jira/browse/SPARK-7398
> https://issues.apache.org/jira/browse/SPARK-18371
>
> The case in the second ticket describes a similar issue as we have here. We
> use Kafka to send large batches (10~100M) to spark streaming, and the spark
> streaming interval is set to 1~4 minutes. With the backpressure set to true,
> the queued active batches still pile up when average batch processing time
> takes longer than default interval. After the spark driver is restarted, all
> queued batches turn to a giant batch, which block subsequent batches and
> also have a great chance to fail eventually. The only config we found that
> might help is "spark.streaming.kafka.maxRatePerPartition". It does limit the
> incoming batch size, but not a perfect solution since it depends on size of
> partition as well as the length of batch interval. For our case, hundreds of
> partitions X minutes of interval still produce a number that is too large
> for each batch. So we still want to figure out how to make the backressure
> work in spark kafka streaming, if it is supposed to work there. Thanks.
>
>
> Liren
>
>
>
>
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Mac vs cluster Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Cody Koeninger
This is running locally on my mac, but it's still a standalone spark
master with multiple separate executor jvms (i.e. using --master not
--local[2]), so it should be the same code paths.  I can't speak to
yarn one way or the other, but you said you tried it with the
standalone scheduler.

At the very least, you should run ./bin/kafka-run-class.sh
kafka.tools.GetOffsetShell  with -1 and -2 and compare those results
to what you're seeing from spark.  The results you posted from spark
didn't show any incoming messages at all.

On Sat, Nov 19, 2016 at 11:12 AM, Hster Geguri
<hster.investiga...@gmail.com> wrote:
> Hi Cody,
>
> Thank you for testing this on a Saturday morning!  I failed to mention that
> when our data engineer runs our drivers(even complex ones) locally on his
> Mac, the drivers work fine. However when we launch it into the cluster (4
> machines either for a YARN cluster or spark standalone) we get this issue.
>
> Heji
>
>
> On Sat, Nov 19, 2016 at 8:53 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> I ran your example using the versions of kafka and spark you are
>> using, against a standalone cluster.  This is what I observed:
>>
>> (in kafka working directory)
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -2
>> simple_logtest:2:0
>> simple_logtest:4:0
>> simple_logtest:1:0
>> simple_logtest:3:0
>> simple_logtest:0:0
>>
>> bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
>> --broker-list 'localhost:9092' --topic simple_logtest --time -1
>> simple_logtest:2:31
>> simple_logtest:4:31
>> simple_logtest:1:31
>> simple_logtest:3:31
>> simple_logtest:0:31
>>
>> So in other words, there are 5 partitions, they all have messages in them
>>
>> (in spark working directory)
>>
>> bash-3.2$ ./bin/spark-submit --master
>> spark://Codys-MacBook-Pro.local:7077 --class
>> example.SimpleKafkaLoggingDriver
>>
>> /private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
>> localhost:9092 simple_logtest mygroup earliest
>>
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>>
>> simple_logtest 3 offsets: 0 to 31
>> simple_logtest 0 offsets: 0 to 31
>> simple_logtest 1 offsets: 0 to 31
>> simple_logtest 2 offsets: 0 to 31
>> simple_logtest 4 offsets: 0 to 31
>>
>> 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
>> 1479574025000 ms.0 from job set of time 1479574025000 ms
>> 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
>> 1479574025000 ms (execution: 0.005 s)
>> 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
>> 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
>> 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
>> 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
>> 147957403 ms.0 from job set of time 147957403 ms
>>
>> simple_logtest 3 offsets: 31 to 31
>> simple_logtest 0 offsets: 31 to 31
>> simple_logtest 1 offsets: 31 to 31
>> simple_logtest 2 offsets: 31 to 31
>> simple_logtest 4 offsets: 31 to 31
>>
>> So in other words, spark is indeed seeing offsets for each partition.
>>
>>
>> The results you posted look to me like there aren't any messages going
>> into the other partitions, which looks like a misbehaving producer.
>>
>> On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
>> <hster.investiga...@gmail.com> wrote:
>> > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
>> > been
>> > struggling with this show stopper problem.
>> >
>> > When we run our drivers with auto.offset.reset=latest ingesting from a
>> > single kafka topic with 10 partitions, the driver reads correctly from
>> > all
>> > 10 partitions.
>> >
>> > However when we use auto.offset.reset=earliest, the driver will read
>> > only a
>> > single partition.
>> >
>> > When we turn on the debug logs, we sometimes see partitions being set to
>> > different offset configuration even though the consumer config correctly
>> > indicates auto.offset.reset=earliest.
>> >
>> >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest
>> >> offset.
>> >> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> >> 9 DEBUG Resetting offset for partition simple_test-

Re: using StreamingKMeans

2016-11-19 Thread Cody Koeninger
So I haven't played around with streaming k means at all, but given
that no one responded to your message a couple of days ago, I'll say
what I can.

1. Can you not sample out some % of the stream for training?
2. Can you run multiple streams at the same time with different values
for k and compare their performance?
3. foreachRDD is fine in general, can't speak to the specifics.
4. If you haven't done any transformations yet on a direct stream,
foreachRDD will give you a KafkaRDD.  Checking if a KafkaRDD is empty
is very cheap, it's done on the driver only because the beginning and
ending offsets are known.  So you should be able to skip empty
batches.



On Sat, Nov 19, 2016 at 10:46 AM, debasishg  wrote:
> Hello -
>
> I am trying to implement an outlier detection application on streaming data.
> I am a newbie to Spark and hence would like some advice on the confusions
> that I have ..
>
> I am thinking of using StreamingKMeans - is this a good choice ? I have one
> stream of data and I need an online algorithm. But here are some questions
> that immediately come to my mind ..
>
> 1. I cannot do separate training, cross validation etc. Is this a good idea
> to do training and prediction online ?
>
> 2. The data will be read from the stream coming from Kafka in microbatches
> of (say) 3 seconds. I get a DStream on which I train and get the clusters.
> How can I decide on the number of clusters ? Using StreamingKMeans is there
> any way I can iterate on microbatches with different values of k to find the
> optimal one ?
>
> 3. Even if I fix k, after training on every microbatch I get a DStream. How
> can I compute things like clustering score on the DStream ?
> StreamingKMeansModel has a computeCost function but it takes an RDD. I can
> use dstream.foreachRDD { // process RDD for the micro batch here } - is this
> the idiomatic way ?
>
> 4. If I use dstream.foreachRDD { .. } and use functions like new
> StandardScaler().fit(rdd) to do feature normalization, then it works when I
> have data in the stream. But when the microbatch is empty (say I don't have
> data for some time), the fit method throws exception as it gets an empty
> collection. Things start working ok when data starts coming back to the
> stream. But is this the way to go ?
>
> any suggestion will be welcome ..
>
> regards.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/using-StreamingKMeans-tp28109.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka direct approach,App UI shows wrong input rate

2016-11-19 Thread Cody Koeninger
There have definitely been issues with UI reporting for the direct
stream in the past, but I'm not able to reproduce this with 2.0.2 and
0.8.  See below:

https://i.imgsafe.org/086019ae57.png



On Fri, Nov 18, 2016 at 4:38 AM, Julian Keppel
 wrote:
> Hello,
>
> I use Spark 2.0.2 with Kafka integration 0-8. The Kafka version is 0.10.0.1
> (Scala 2.11). I read data from Kafka with the direct approach. The complete
> infrastructure runs on Google Container Engine.
>
> I wonder why the corresponding application UI says the input rate is zero
> records per second. This is definitely wrong. I checked it while I printed
> out the incoming records to the driver console. All other metrics seem to be
> correct (at least they are realistic).
>
> What is going on here? Do you have any idea? Thanks for you help.
>
> Julian

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-19 Thread Cody Koeninger
I ran your example using the versions of kafka and spark you are
using, against a standalone cluster.  This is what I observed:

(in kafka working directory)

bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list 'localhost:9092' --topic simple_logtest --time -2
simple_logtest:2:0
simple_logtest:4:0
simple_logtest:1:0
simple_logtest:3:0
simple_logtest:0:0

bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list 'localhost:9092' --topic simple_logtest --time -1
simple_logtest:2:31
simple_logtest:4:31
simple_logtest:1:31
simple_logtest:3:31
simple_logtest:0:31

So in other words, there are 5 partitions, they all have messages in them

(in spark working directory)

bash-3.2$ ./bin/spark-submit --master
spark://Codys-MacBook-Pro.local:7077 --class
example.SimpleKafkaLoggingDriver
/private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
localhost:9092 simple_logtest mygroup earliest


16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
1479574025000 ms.0 from job set of time 1479574025000 ms

simple_logtest 3 offsets: 0 to 31
simple_logtest 0 offsets: 0 to 31
simple_logtest 1 offsets: 0 to 31
simple_logtest 2 offsets: 0 to 31
simple_logtest 4 offsets: 0 to 31

16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
1479574025000 ms.0 from job set of time 1479574025000 ms
16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
1479574025000 ms (execution: 0.005 s)
16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 147957403 ms
16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
147957403 ms.0 from job set of time 147957403 ms

simple_logtest 3 offsets: 31 to 31
simple_logtest 0 offsets: 31 to 31
simple_logtest 1 offsets: 31 to 31
simple_logtest 2 offsets: 31 to 31
simple_logtest 4 offsets: 31 to 31

So in other words, spark is indeed seeing offsets for each partition.


The results you posted look to me like there aren't any messages going
into the other partitions, which looks like a misbehaving producer.

On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
 wrote:
> Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have been
> struggling with this show stopper problem.
>
> When we run our drivers with auto.offset.reset=latest ingesting from a
> single kafka topic with 10 partitions, the driver reads correctly from all
> 10 partitions.
>
> However when we use auto.offset.reset=earliest, the driver will read only a
> single partition.
>
> When we turn on the debug logs, we sometimes see partitions being set to
> different offset configuration even though the consumer config correctly
> indicates auto.offset.reset=earliest.
>
>> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 TRACE Sending ListOffsetRequest
>> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>> to broker 10.102.20.12:9092 (id: 12 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 TRACE Sending ListOffsetRequest
>> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>> to broker 10.102.20.13:9092 (id: 13 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 TRACE Received ListOffsetResponse
>> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>> from broker 10.102.20.12:9092 (id: 12 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 TRACE Received ListOffsetResponse
>> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>> from broker 10.102.20.13:9092 (id: 13 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>
>
>
> I've enclosed below the completely stripped down trivial test driver that
> shows this behavior. We normally run with YARN 2.7.3 but have also tried
> running spark standalone mode which has the same behavior. Our drivers are
> normally java but we have tried the scala version which also has the same
> incorrect behavior. We have tried different LocationStrategies and partition
> assignment strategies all without success.  Any insight would be greatly
> appreciated.
>
> package com.x.labs.analytics.diagnostics.spark.drivers
>
> import org.apache.kafka.common.serialization.StringDeserializer
> import 

Re: Kafka segmentation

2016-11-19 Thread Cody Koeninger
I mean I don't understand exactly what the issue is.  Can you fill in
these blanks

My settings are :

My code is :

I expected to see :

Instead, I saw :

On Thu, Nov 17, 2016 at 12:53 PM, Hoang Bao Thien <hbthien0...@gmail.com> wrote:
> I am sorry I don't understand your idea. What do you mean exactly?
>
> On Fri, Nov 18, 2016 at 1:50 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Ok, I don't think I'm clear on the issue then.  Can you say what the
>> expected behavior is, and what the observed behavior is?
>>
>> On Thu, Nov 17, 2016 at 10:48 AM, Hoang Bao Thien <hbthien0...@gmail.com>
>> wrote:
>> > Hi,
>> >
>> > Thanks for your comments. But in fact, I don't want to limit the size of
>> > batches, it could be any greater size as it does.
>> >
>> > Thien
>> >
>> > On Fri, Nov 18, 2016 at 1:17 AM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> If you want a consistent limit on the size of batches, use
>> >> spark.streaming.kafka.maxRatePerPartition  (assuming you're using
>> >> createDirectStream)
>> >>
>> >> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>> >>
>> >> On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien
>> >> <hbthien0...@gmail.com>
>> >> wrote:
>> >> > Hi,
>> >> >
>> >> > I use CSV and other text files to Kafka just to test Kafka + Spark
>> >> > Streaming
>> >> > by using direct stream. That's why I don't want Spark streaming reads
>> >> > CSVs
>> >> > or text files directly.
>> >> > In addition, I don't want a giant batch of records like the link you
>> >> > sent.
>> >> > The problem is that we should receive the "similar" number of record
>> >> > of
>> >> > all
>> >> > batchs instead of the first two or three batches have so large number
>> >> > of
>> >> > records (e.g., 100K) but the last 1000 batches with only 200 records.
>> >> >
>> >> > I know that the problem is not from the auto.offset.reset=largest,
>> >> > but I
>> >> > don't know what I can do in this case.
>> >> >
>> >> > Do you and other ones could suggest me some solutions please as this
>> >> > seems
>> >> > the normal situation with Kafka+SpartStreaming.
>> >> >
>> >> > Thanks.
>> >> > Alex
>> >> >
>> >> >
>> >> >
>> >> > On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger <c...@koeninger.org>
>> >> > wrote:
>> >> >>
>> >> >> Yeah, if you're reporting issues, please be clear as to whether
>> >> >> backpressure is enabled, and whether maxRatePerPartition is set.
>> >> >>
>> >> >> I expect that there is something wrong with backpressure, see e.g.
>> >> >> https://issues.apache.org/jira/browse/SPARK-18371
>> >> >>
>> >> >> On Wed, Nov 16, 2016 at 5:05 PM, bo yang <bobyan...@gmail.com>
>> >> >> wrote:
>> >> >> > I hit similar issue with Spark Streaming. The batch size seemed a
>> >> >> > little
>> >> >> > random. Sometime it was large with many Kafka messages inside same
>> >> >> > batch,
>> >> >> > sometimes it was very small with just a few messages. Is it
>> >> >> > possible
>> >> >> > that
>> >> >> > was caused by the backpressure implementation in Spark Streaming?
>> >> >> >
>> >> >> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger
>> >> >> > <c...@koeninger.org>
>> >> >> > wrote:
>> >> >> >>
>> >> >> >> Moved to user list.
>> >> >> >>
>> >> >> >> I'm not really clear on what you're trying to accomplish (why put
>> >> >> >> the
>> >> >> >> csv file through Kafka instead of reading it directly with
>> >> >> >> spark?)
>> >> >> >>
>> >> >> >> auto.offset.reset=largest just means that when starting the job
>> >> >> >> without any defined offsets, it will start at the highest 

Re: Kafka segmentation

2016-11-17 Thread Cody Koeninger
Ok, I don't think I'm clear on the issue then.  Can you say what the
expected behavior is, and what the observed behavior is?

On Thu, Nov 17, 2016 at 10:48 AM, Hoang Bao Thien <hbthien0...@gmail.com> wrote:
> Hi,
>
> Thanks for your comments. But in fact, I don't want to limit the size of
> batches, it could be any greater size as it does.
>
> Thien
>
> On Fri, Nov 18, 2016 at 1:17 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> If you want a consistent limit on the size of batches, use
>> spark.streaming.kafka.maxRatePerPartition  (assuming you're using
>> createDirectStream)
>>
>> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>>
>> On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien <hbthien0...@gmail.com>
>> wrote:
>> > Hi,
>> >
>> > I use CSV and other text files to Kafka just to test Kafka + Spark
>> > Streaming
>> > by using direct stream. That's why I don't want Spark streaming reads
>> > CSVs
>> > or text files directly.
>> > In addition, I don't want a giant batch of records like the link you
>> > sent.
>> > The problem is that we should receive the "similar" number of record of
>> > all
>> > batchs instead of the first two or three batches have so large number of
>> > records (e.g., 100K) but the last 1000 batches with only 200 records.
>> >
>> > I know that the problem is not from the auto.offset.reset=largest, but I
>> > don't know what I can do in this case.
>> >
>> > Do you and other ones could suggest me some solutions please as this
>> > seems
>> > the normal situation with Kafka+SpartStreaming.
>> >
>> > Thanks.
>> > Alex
>> >
>> >
>> >
>> > On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> Yeah, if you're reporting issues, please be clear as to whether
>> >> backpressure is enabled, and whether maxRatePerPartition is set.
>> >>
>> >> I expect that there is something wrong with backpressure, see e.g.
>> >> https://issues.apache.org/jira/browse/SPARK-18371
>> >>
>> >> On Wed, Nov 16, 2016 at 5:05 PM, bo yang <bobyan...@gmail.com> wrote:
>> >> > I hit similar issue with Spark Streaming. The batch size seemed a
>> >> > little
>> >> > random. Sometime it was large with many Kafka messages inside same
>> >> > batch,
>> >> > sometimes it was very small with just a few messages. Is it possible
>> >> > that
>> >> > was caused by the backpressure implementation in Spark Streaming?
>> >> >
>> >> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger <c...@koeninger.org>
>> >> > wrote:
>> >> >>
>> >> >> Moved to user list.
>> >> >>
>> >> >> I'm not really clear on what you're trying to accomplish (why put
>> >> >> the
>> >> >> csv file through Kafka instead of reading it directly with spark?)
>> >> >>
>> >> >> auto.offset.reset=largest just means that when starting the job
>> >> >> without any defined offsets, it will start at the highest (most
>> >> >> recent) available offsets.  That's probably not what you want if
>> >> >> you've already loaded csv lines into kafka.
>> >> >>
>> >> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien
>> >> >> <hbthien0...@gmail.com>
>> >> >> wrote:
>> >> >> > Hi all,
>> >> >> >
>> >> >> > I would like to ask a question related to the size of Kafka
>> >> >> > stream. I
>> >> >> > want
>> >> >> > to put data (e.g., file *.csv) to Kafka then use Spark streaming
>> >> >> > to
>> >> >> > get
>> >> >> > the
>> >> >> > output from Kafka and then save to Hive by using SparkSQL. The
>> >> >> > file
>> >> >> > csv
>> >> >> > is
>> >> >> > about 100MB with ~250K messages/rows (Each row has about 10 fields
>> >> >> > of
>> >> >> > integer). I see that Spark Streaming first received two
>> >> >> > partitions/batches,
>> >> >> > the first is of 60K messages and the second is of 50K msgs. But
>> >> >> > from
>> >> >> > the
>> >> >> > third batch, Spark just received 200 messages for each batch (or
>> >> >> > partition).
>> >> >> > I think that this problem is coming from Kafka or some
>> >> >> > configuration
>> >> >> > in
>> >> >> > Spark. I already tried to configure with the setting
>> >> >> > "auto.offset.reset=largest", but every batch only gets 200
>> >> >> > messages.
>> >> >> >
>> >> >> > Could you please tell me how to fix this problem?
>> >> >> > Thank you so much.
>> >> >> >
>> >> >> > Best regards,
>> >> >> > Alex
>> >> >> >
>> >> >>
>> >> >>
>> >> >> -
>> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >>
>> >> >
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka segmentation

2016-11-17 Thread Cody Koeninger
If you want a consistent limit on the size of batches, use
spark.streaming.kafka.maxRatePerPartition  (assuming you're using
createDirectStream)

http://spark.apache.org/docs/latest/configuration.html#spark-streaming

On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien <hbthien0...@gmail.com> wrote:
> Hi,
>
> I use CSV and other text files to Kafka just to test Kafka + Spark Streaming
> by using direct stream. That's why I don't want Spark streaming reads CSVs
> or text files directly.
> In addition, I don't want a giant batch of records like the link you sent.
> The problem is that we should receive the "similar" number of record of all
> batchs instead of the first two or three batches have so large number of
> records (e.g., 100K) but the last 1000 batches with only 200 records.
>
> I know that the problem is not from the auto.offset.reset=largest, but I
> don't know what I can do in this case.
>
> Do you and other ones could suggest me some solutions please as this seems
> the normal situation with Kafka+SpartStreaming.
>
> Thanks.
> Alex
>
>
>
> On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Yeah, if you're reporting issues, please be clear as to whether
>> backpressure is enabled, and whether maxRatePerPartition is set.
>>
>> I expect that there is something wrong with backpressure, see e.g.
>> https://issues.apache.org/jira/browse/SPARK-18371
>>
>> On Wed, Nov 16, 2016 at 5:05 PM, bo yang <bobyan...@gmail.com> wrote:
>> > I hit similar issue with Spark Streaming. The batch size seemed a little
>> > random. Sometime it was large with many Kafka messages inside same
>> > batch,
>> > sometimes it was very small with just a few messages. Is it possible
>> > that
>> > was caused by the backpressure implementation in Spark Streaming?
>> >
>> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >>
>> >> Moved to user list.
>> >>
>> >> I'm not really clear on what you're trying to accomplish (why put the
>> >> csv file through Kafka instead of reading it directly with spark?)
>> >>
>> >> auto.offset.reset=largest just means that when starting the job
>> >> without any defined offsets, it will start at the highest (most
>> >> recent) available offsets.  That's probably not what you want if
>> >> you've already loaded csv lines into kafka.
>> >>
>> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien
>> >> <hbthien0...@gmail.com>
>> >> wrote:
>> >> > Hi all,
>> >> >
>> >> > I would like to ask a question related to the size of Kafka stream. I
>> >> > want
>> >> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to
>> >> > get
>> >> > the
>> >> > output from Kafka and then save to Hive by using SparkSQL. The file
>> >> > csv
>> >> > is
>> >> > about 100MB with ~250K messages/rows (Each row has about 10 fields of
>> >> > integer). I see that Spark Streaming first received two
>> >> > partitions/batches,
>> >> > the first is of 60K messages and the second is of 50K msgs. But from
>> >> > the
>> >> > third batch, Spark just received 200 messages for each batch (or
>> >> > partition).
>> >> > I think that this problem is coming from Kafka or some configuration
>> >> > in
>> >> > Spark. I already tried to configure with the setting
>> >> > "auto.offset.reset=largest", but every batch only gets 200 messages.
>> >> >
>> >> > Could you please tell me how to fix this problem?
>> >> > Thank you so much.
>> >> >
>> >> > Best regards,
>> >> > Alex
>> >> >
>> >>
>> >> -
>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka segmentation

2016-11-16 Thread Cody Koeninger
Yeah, if you're reporting issues, please be clear as to whether
backpressure is enabled, and whether maxRatePerPartition is set.

I expect that there is something wrong with backpressure, see e.g.
https://issues.apache.org/jira/browse/SPARK-18371

On Wed, Nov 16, 2016 at 5:05 PM, bo yang <bobyan...@gmail.com> wrote:
> I hit similar issue with Spark Streaming. The batch size seemed a little
> random. Sometime it was large with many Kafka messages inside same batch,
> sometimes it was very small with just a few messages. Is it possible that
> was caused by the backpressure implementation in Spark Streaming?
>
> On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Moved to user list.
>>
>> I'm not really clear on what you're trying to accomplish (why put the
>> csv file through Kafka instead of reading it directly with spark?)
>>
>> auto.offset.reset=largest just means that when starting the job
>> without any defined offsets, it will start at the highest (most
>> recent) available offsets.  That's probably not what you want if
>> you've already loaded csv lines into kafka.
>>
>> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien <hbthien0...@gmail.com>
>> wrote:
>> > Hi all,
>> >
>> > I would like to ask a question related to the size of Kafka stream. I
>> > want
>> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to get
>> > the
>> > output from Kafka and then save to Hive by using SparkSQL. The file csv
>> > is
>> > about 100MB with ~250K messages/rows (Each row has about 10 fields of
>> > integer). I see that Spark Streaming first received two
>> > partitions/batches,
>> > the first is of 60K messages and the second is of 50K msgs. But from the
>> > third batch, Spark just received 200 messages for each batch (or
>> > partition).
>> > I think that this problem is coming from Kafka or some configuration in
>> > Spark. I already tried to configure with the setting
>> > "auto.offset.reset=largest", but every batch only gets 200 messages.
>> >
>> > Could you please tell me how to fix this problem?
>> > Thank you so much.
>> >
>> > Best regards,
>> > Alex
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka segmentation

2016-11-16 Thread Cody Koeninger
Moved to user list.

I'm not really clear on what you're trying to accomplish (why put the
csv file through Kafka instead of reading it directly with spark?)

auto.offset.reset=largest just means that when starting the job
without any defined offsets, it will start at the highest (most
recent) available offsets.  That's probably not what you want if
you've already loaded csv lines into kafka.

On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien  wrote:
> Hi all,
>
> I would like to ask a question related to the size of Kafka stream. I want
> to put data (e.g., file *.csv) to Kafka then use Spark streaming to get the
> output from Kafka and then save to Hive by using SparkSQL. The file csv is
> about 100MB with ~250K messages/rows (Each row has about 10 fields of
> integer). I see that Spark Streaming first received two partitions/batches,
> the first is of 60K messages and the second is of 50K msgs. But from the
> third batch, Spark just received 200 messages for each batch (or partition).
> I think that this problem is coming from Kafka or some configuration in
> Spark. I already tried to configure with the setting
> "auto.offset.reset=largest", but every batch only gets 200 messages.
>
> Could you please tell me how to fix this problem?
> Thank you so much.
>
> Best regards,
> Alex
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



  1   2   3   4   5   6   7   >