[
https://issues.apache.org/jira/browse/SPARK-18779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pranav Nakhe closed SPARK-18779.
--------------------------------
Resolution: Not A Problem
I looked at the code and was able to recreate this issue with a kafka client
outside spark. Hence closing this.
> Messages being received only from one partition when using Spark Streaming
> integration for Kafka 0.10 with kafka client library at 0.10.1
> -----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-18779
> URL: https://issues.apache.org/jira/browse/SPARK-18779
> Project: Spark
> Issue Type: Improvement
> Components: DStreams
> Affects Versions: 2.0.2
> Reporter: Pranav Nakhe
>
> I apologize for the earlier descripion which wasnt very clear about the
> issue. I would give a detailed description and my usecase now -
> I have a spark application running which is consuming kafka messages using
> Spark Kafka 0.10 integration. I now need to stop my spark application and the
> user would then tell what timestamp in the past the spark application should
> start reading messages from (replaying messages). The timestamp is mapped to
> kafka offset by using the 'offsetsForTimes' API in KafkaConsumer introduced
> in 10.1.0 client of Kafka. That offset is then used to create DStream
> Because Kafka 10.0.1 des not have API 'offsetsForTimes' I need to use Kafka
> 10.1.0.
> So to achieve that behavior I replaced the 10.0.1 jar in Spark environment
> with 10.1.0 jar. Things started working for me but the application could read
> only messages from the first partition.
> To recreate the issue I wrote a local program and had 10.1.0 jar in the
> classpath
> ********************************
> val topics = Set("Z1Topic")
> val topicPartitionOffsetMap = new HashMap[TopicPartition, Long]()
> topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",0), 10L) //hardcoded
> offset to 10 instead of getting the offset from 'offsetsForTimes'
> topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",1), 10L)
> import scala.collection.JavaConversions._
> val stream = KafkaUtils.createDirectStream[String, String](ssc,
> PreferBrokers, Subscribe[String, String](topics, kafkaParams,
> topicPartitionOffsetMap))
> val x = stream.map(x => x.value())
> x.print()
> ********************************
> This printed only the messages in the first topic from offset 10. (This is
> with 10.1.0 client)
> If I am to use Kafka 10.0.1 client for the above program, things work fine
> and I receive messages from all partitions but I cant use the
> 'offsetsForTimes' API (because it doesnt exist in 10.0.1 client).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]