[ 
https://issues.apache.org/jira/browse/SPARK-18779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pranav Nakhe closed SPARK-18779.
--------------------------------
    Resolution: Not A Problem

I looked at the code and was able to recreate this issue with a kafka client 
outside spark. Hence closing this.

> Messages being received only from one partition when using Spark Streaming 
> integration for Kafka 0.10 with kafka client library at 0.10.1
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18779
>                 URL: https://issues.apache.org/jira/browse/SPARK-18779
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams
>    Affects Versions: 2.0.2
>            Reporter: Pranav Nakhe
>
> I apologize for the earlier descripion which wasnt very clear about the 
> issue. I would give a detailed description and my usecase now -
> I have a spark application running which is consuming kafka messages using 
> Spark Kafka 0.10 integration. I now need to stop my spark application and the 
> user would then tell what timestamp in the past the spark application should 
> start reading messages from (replaying messages). The timestamp is mapped to 
> kafka offset by using the 'offsetsForTimes' API in KafkaConsumer introduced 
> in 10.1.0 client of Kafka. That offset is then used to create DStream
> Because Kafka 10.0.1 des not have API 'offsetsForTimes' I need to use Kafka 
> 10.1.0. 
> So to achieve that behavior I replaced the 10.0.1 jar in Spark environment 
> with 10.1.0 jar. Things started working for me but the application could read 
> only messages from the first partition.
> To recreate the issue I wrote a local program and had 10.1.0 jar in the 
> classpath
> ********************************
> val topics = Set("Z1Topic")
> val topicPartitionOffsetMap = new HashMap[TopicPartition, Long]()
> topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",0), 10L) //hardcoded 
> offset to 10 instead of getting the offset from 'offsetsForTimes'
> topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",1), 10L)
> import scala.collection.JavaConversions._
> val stream = KafkaUtils.createDirectStream[String, String](ssc, 
> PreferBrokers, Subscribe[String, String](topics, kafkaParams, 
> topicPartitionOffsetMap))
> val x = stream.map(x => x.value())
> x.print()
> ********************************
> This printed only the messages in the first topic from offset 10.  (This is 
> with 10.1.0 client)
> If I am to use Kafka 10.0.1 client for the above program, things work fine 
> and I receive messages from all partitions but I cant use the 
> 'offsetsForTimes' API (because it doesnt exist in 10.0.1 client). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to