When you say the patch is not suitable, can you clarify why? Probably best to get the various findings centralized on https://issues.apache.org/jira/browse/SPARK-17147
Happy to help with getting the patch up to date and working. On Wed, Jan 24, 2018 at 1:19 AM, namesuperwood <namesuperw...@gmail.com> wrote: > It seems this patch is not suitable for our problem。 > > https://github.com/apache/spark/compare/master...koeninger:SPARK-17147 > > wood.super > > 原始邮件 > 发件人: namesuperwood<namesuperw...@gmail.com> > 收件人: Justin Miller<justin.mil...@protectwise.com> > 抄送: user<user@spark.apache.org>; Cody Koeninger<c...@koeninger.org> > 发送时间: 2018年1月24日(周三) 14:45 > 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure > > Yes. My spark streaming application works with uncompacted topic. I will > check the patch. > > > wood.super > > 原始邮件 > 发件人: Justin Miller<justin.mil...@protectwise.com> > 收件人: namesuperwood<namesuperw...@gmail.com> > 抄送: user<user@spark.apache.org>; Cody Koeninger<c...@koeninger.org> > 发送时间: 2018年1月24日(周三) 14:23 > 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure > > We appear to be kindred spirits, I’ve recently run into the same issue. Are > you running compacted topics? I’ve run into this issue on non-compacted > topics as well, it happens rarely but is still a pain. You might check out > this patch and related spark streaming Kafka ticket: > > https://github.com/apache/spark/compare/master...koeninger:SPARK-17147 > https://issues.apache.org/jira/browse/SPARK-17147 > > I’ll be testing out the patch on somewhat large scale stream processor > tomorrow. > > CCing: Cody Koeninger > > Best, > Justin > > On Jan 23, 2018, at 10:48 PM, namesuperwood <namesuperw...@gmail.com> wrote: > > Hi all > > kafka version : kafka_2.11-0.11.0.2 > spark version : 2.0.1 > > A topic-partition "adn-tracking,15" in kafka who's earliest offset is > 1255644602 and latest offset is 1271253441. > > While starting a spark streaming to process the data from the topic , we > got a exception with "Got wrong record XXXX even after seeking to offset > 1266921577”. [ (earliest offset) 1255644602 < 1266921577 < > 1271253441 ( latest offset ) ] > > Finally, I found the following source code in class CachedKafkaCounsumer > from spark-streaming. This is obviously due to the fact that the offset from > consumer poll and the offset which the comsuner seek is not equal. > > > Here is the “ CachedKafkaCounsumer.scala” code: > > def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { > > logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested > $offset") if (offset != nextOffset) { > > logInfo(s"Initial fetch for $groupId $topic $partition $offset") > seek(offset) poll(timeout) } > > if (!buffer.hasNext()) { poll(timeout) } > assert(buffer.hasNext(), > s"Failed to get records for $groupId $topic $partition $offset after > polling for $timeout") > var record = buffer.next() > > if (record.offset != offset) { > logInfo(s"Buffer miss for $groupId $topic $partition $offset") > seek(offset) > poll(timeout) > assert(buffer.hasNext(), > s"Failed to get records for $groupId $topic $partition $offset after > polling for $timeout") > record = buffer.next() > assert(record.offset == offset, > s"Got wrong record for $groupId $topic $partition even after seeking to > offset $offset") > } > > nextOffset = offset + 1 > record > > } > > I reproduce this problem, and found out that offset from one > topicAndPartition is uncontinuous in Kafka。I think this is a bug that needs > to be repaired. > > I implemented a simple project to use consumer to seek offset 1266921577. > But it return the offset 1266921578. Then while seek to 1266921576, it > return the 1266921576 exactly。 > > > > > > There is the code: > > public class consumerDemo { > > public static void main(String[] argv){ > > Properties props = new Properties(); > props.put("bootstrap.servers", "172.31.29.31:9091"); > props.put("group.id", "consumer-tutorial-demo"); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("value.deserializer", StringDeserializer.class.getName()); > KafkaConsumer<String, String> consumer = new KafkaConsumer<String, > String>(props); > TopicPartition tp = new TopicPartition("adn-tracking-click", 15); > Collection<TopicPartition> collection = new ArrayList<TopicPartition>(); > collection.add(tp); consumer.assign(collection); > consumer.seek(tp, 1266921576); ConsumerRecords<String, String> > consumerRecords = consumer.poll(10000); > List<ConsumerRecord<String, String>> listR = consumerRecords.records(tp); > Iterator<ConsumerRecord<String, String> > iter = listR.iterator(); > ConsumerRecord<String, String> record = iter.next(); > System.out.println(" the next record " + record.offset() + " recode topic " > + record.topic()); > } > > } > > > > > > > > > > > wood.super > > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org