[ https://issues.apache.org/jira/browse/SPARK-18779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pranav Nakhe closed SPARK-18779. -------------------------------- Resolution: Not A Problem I looked at the code and was able to recreate this issue with a kafka client outside spark. Hence closing this. > Messages being received only from one partition when using Spark Streaming > integration for Kafka 0.10 with kafka client library at 0.10.1 > ----------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-18779 > URL: https://issues.apache.org/jira/browse/SPARK-18779 > Project: Spark > Issue Type: Improvement > Components: DStreams > Affects Versions: 2.0.2 > Reporter: Pranav Nakhe > > I apologize for the earlier descripion which wasnt very clear about the > issue. I would give a detailed description and my usecase now - > I have a spark application running which is consuming kafka messages using > Spark Kafka 0.10 integration. I now need to stop my spark application and the > user would then tell what timestamp in the past the spark application should > start reading messages from (replaying messages). The timestamp is mapped to > kafka offset by using the 'offsetsForTimes' API in KafkaConsumer introduced > in 10.1.0 client of Kafka. That offset is then used to create DStream > Because Kafka 10.0.1 des not have API 'offsetsForTimes' I need to use Kafka > 10.1.0. > So to achieve that behavior I replaced the 10.0.1 jar in Spark environment > with 10.1.0 jar. Things started working for me but the application could read > only messages from the first partition. > To recreate the issue I wrote a local program and had 10.1.0 jar in the > classpath > ******************************** > val topics = Set("Z1Topic") > val topicPartitionOffsetMap = new HashMap[TopicPartition, Long]() > topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",0), 10L) //hardcoded > offset to 10 instead of getting the offset from 'offsetsForTimes' > topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",1), 10L) > import scala.collection.JavaConversions._ > val stream = KafkaUtils.createDirectStream[String, String](ssc, > PreferBrokers, Subscribe[String, String](topics, kafkaParams, > topicPartitionOffsetMap)) > val x = stream.map(x => x.value()) > x.print() > ******************************** > This printed only the messages in the first topic from offset 10. (This is > with 10.1.0 client) > If I am to use Kafka 10.0.1 client for the above program, things work fine > and I receive messages from all partitions but I cant use the > 'offsetsForTimes' API (because it doesnt exist in 10.0.1 client). -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org