[
https://issues.apache.org/jira/browse/SPARK-18779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pranav Nakhe reopened SPARK-18779:
----------------------------------
Well there seems to be an issue in spark after all.
In the ConsumerStrategy class there is a pause on the consumer
// we've called poll, we must pause or next poll may consume messages and set
position
consumer.pause(consumer.assignment())
We never resume the consumer and that seems to causing the issue. The
KafkaConsumer implementation has changed between 10.0.1 and 10.1.0 which has
exposed this issue. The solution to this issue is to resume the consumer before
we find the position in DirectKafkaInputDStream class in the latestOffsets
method.
c.pause(newPartitions.asJava)
// find latest available offsets
c.seekToEnd(currentOffsets.keySet.asJava)
c.resume(newPartitions.asJava) /* part of fix */
c.resume(c.assignment()) /*part of fix - resuming what paused in the
ConsumerStrategy class */
parts.map(tp => tp -> c.position(tp)).toMap
I have tested this fix and it works fine. The reason the issue is not seen in
the current setup is because pause/resume logic is changed in the latest kafka
version. We dont seem to have a resume for the pause and hence this fix is
necessary. I would be happy to make these changes if they seem fine.
> Messages being received only from one partition when using Spark Streaming
> integration for Kafka 0.10 with kafka client library at 0.10.1
> -----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-18779
> URL: https://issues.apache.org/jira/browse/SPARK-18779
> Project: Spark
> Issue Type: Improvement
> Components: DStreams
> Affects Versions: 2.0.2
> Reporter: Pranav Nakhe
>
> I apologize for the earlier descripion which wasnt very clear about the
> issue. I would give a detailed description and my usecase now -
> I have a spark application running which is consuming kafka messages using
> Spark Kafka 0.10 integration. I now need to stop my spark application and the
> user would then tell what timestamp in the past the spark application should
> start reading messages from (replaying messages). The timestamp is mapped to
> kafka offset by using the 'offsetsForTimes' API in KafkaConsumer introduced
> in 10.1.0 client of Kafka. That offset is then used to create DStream
> Because Kafka 10.0.1 des not have API 'offsetsForTimes' I need to use Kafka
> 10.1.0.
> So to achieve that behavior I replaced the 10.0.1 jar in Spark environment
> with 10.1.0 jar. Things started working for me but the application could read
> only messages from the first partition.
> To recreate the issue I wrote a local program and had 10.1.0 jar in the
> classpath
> ********************************
> val topics = Set("Z1Topic")
> val topicPartitionOffsetMap = new HashMap[TopicPartition, Long]()
> topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",0), 10L) //hardcoded
> offset to 10 instead of getting the offset from 'offsetsForTimes'
> topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",1), 10L)
> import scala.collection.JavaConversions._
> val stream = KafkaUtils.createDirectStream[String, String](ssc,
> PreferBrokers, Subscribe[String, String](topics, kafkaParams,
> topicPartitionOffsetMap))
> val x = stream.map(x => x.value())
> x.print()
> ********************************
> This printed only the messages in the first topic from offset 10. (This is
> with 10.1.0 client)
> If I am to use Kafka 10.0.1 client for the above program, things work fine
> and I receive messages from all partitions but I cant use the
> 'offsetsForTimes' API (because it doesnt exist in 10.0.1 client).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]