When you say the patch is not suitable, can you clarify why?

Probably best to get the various findings centralized on
https://issues.apache.org/jira/browse/SPARK-17147

Happy to help with getting the patch up to date and working.

On Wed, Jan 24, 2018 at 1:19 AM, namesuperwood <namesuperw...@gmail.com> wrote:
> It seems this patch is not suitable for our problem。
>
> https://github.com/apache/spark/compare/master...koeninger:SPARK-17147
>
> wood.super
>
>  原始邮件
> 发件人: namesuperwood<namesuperw...@gmail.com>
> 收件人: Justin Miller<justin.mil...@protectwise.com>
> 抄送: user<user@spark.apache.org>; Cody Koeninger<c...@koeninger.org>
> 发送时间: 2018年1月24日(周三) 14:45
> 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure
>
> Yes. My spark streaming application works with uncompacted topic. I will
> check the patch.
>
>
> wood.super
>
>  原始邮件
> 发件人: Justin Miller<justin.mil...@protectwise.com>
> 收件人: namesuperwood<namesuperw...@gmail.com>
> 抄送: user<user@spark.apache.org>; Cody Koeninger<c...@koeninger.org>
> 发送时间: 2018年1月24日(周三) 14:23
> 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure
>
> We appear to be kindred spirits, I’ve recently run into the same issue. Are
> you running compacted topics? I’ve run into this issue on non-compacted
> topics as well, it happens rarely but is still a pain. You might check out
> this patch and related spark streaming Kafka ticket:
>
> https://github.com/apache/spark/compare/master...koeninger:SPARK-17147
> https://issues.apache.org/jira/browse/SPARK-17147
>
> I’ll be testing out the patch on somewhat large scale stream processor
> tomorrow.
>
> CCing: Cody Koeninger
>
> Best,
> Justin
>
> On Jan 23, 2018, at 10:48 PM, namesuperwood <namesuperw...@gmail.com> wrote:
>
> Hi all
>
> kafka version :  kafka_2.11-0.11.0.2
>        spark version :  2.0.1
>
> A topic-partition "adn-tracking,15"  in kafka  who's   earliest offset is
> 1255644602 and  latest offset is 1271253441.
>
> While starting a spark streaming to process the data from the topic ,  we
> got a exception with "Got wrong record XXXX  even after seeking to offset
> 1266921577”.  [       (earliest offset) 1255644602 < 1266921577   <
> 1271253441 ( latest offset ) ]
>
> Finally, I found the following source code in class CachedKafkaCounsumer
> from spark-streaming. This is obviously due to the fact that the offset from
> consumer poll and the offset which the comsuner seek is not equal.
>
>
> Here is the “ CachedKafkaCounsumer.scala” code:
>
> def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
>
> logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested
> $offset") if (offset != nextOffset) {
>
> logInfo(s"Initial fetch for $groupId $topic $partition $offset")
> seek(offset) poll(timeout) }
>
> if (!buffer.hasNext()) { poll(timeout) }
> assert(buffer.hasNext(),
>   s"Failed to get records for $groupId $topic $partition $offset after
> polling for $timeout")
> var record = buffer.next()
>
> if (record.offset != offset) {
>   logInfo(s"Buffer miss for $groupId $topic $partition $offset")
>   seek(offset)
>   poll(timeout)
>   assert(buffer.hasNext(),
>     s"Failed to get records for $groupId $topic $partition $offset after
> polling for $timeout")
>   record = buffer.next()
>   assert(record.offset == offset,
>     s"Got wrong record for $groupId $topic $partition even after seeking to
> offset $offset")
> }
>
> nextOffset = offset + 1
> record
>
> }
>
> I reproduce this problem, and found out that offset from one
> topicAndPartition is uncontinuous in Kafka。I think this is a bug that needs
> to be repaired.
>
> I  implemented a simple project to use consumer to  seek offset 1266921577.
> But it return the offset 1266921578. Then while  seek to 1266921576, it
> return the 1266921576 exactly。
>
>
>
>
>
> There is the code:
>
> public class consumerDemo {
>
> public static void main(String[] argv){
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "172.31.29.31:9091");
> props.put("group.id", "consumer-tutorial-demo");
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("value.deserializer", StringDeserializer.class.getName());
> KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
> String>(props);
> TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
> Collection<TopicPartition> collection = new ArrayList<TopicPartition>();
> collection.add(tp); consumer.assign(collection);
> consumer.seek(tp, 1266921576); ConsumerRecords<String, String>
> consumerRecords = consumer.poll(10000);
> List<ConsumerRecord<String, String>> listR = consumerRecords.records(tp);
> Iterator<ConsumerRecord<String, String> > iter = listR.iterator();
> ConsumerRecord<String, String> record = iter.next();
> System.out.println(" the next record " + record.offset() + " recode topic "
> + record.topic());
>    }
>
> }
>
>
>
>
>
>
>
>
>
>
> wood.super
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to