Re: uncontinuous offset in kafka will cause the spark streamingfailure

2018-01-24 Thread Cody Koeninger
When you say the patch is not suitable, can you clarify why?

Probably best to get the various findings centralized on
https://issues.apache.org/jira/browse/SPARK-17147

Happy to help with getting the patch up to date and working.

On Wed, Jan 24, 2018 at 1:19 AM, namesuperwood  wrote:
> It seems this patch is not suitable for our problem。
>
> https://github.com/apache/spark/compare/master...koeninger:SPARK-17147
>
> wood.super
>
>  原始邮件
> 发件人: namesuperwood
> 收件人: Justin Miller
> 抄送: user; Cody Koeninger
> 发送时间: 2018年1月24日(周三) 14:45
> 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure
>
> Yes. My spark streaming application works with uncompacted topic. I will
> check the patch.
>
>
> wood.super
>
>  原始邮件
> 发件人: Justin Miller
> 收件人: namesuperwood
> 抄送: user; Cody Koeninger
> 发送时间: 2018年1月24日(周三) 14:23
> 主题: Re: uncontinuous offset in kafka will cause the spark streamingfailure
>
> We appear to be kindred spirits, I’ve recently run into the same issue. Are
> you running compacted topics? I’ve run into this issue on non-compacted
> topics as well, it happens rarely but is still a pain. You might check out
> this patch and related spark streaming Kafka ticket:
>
> https://github.com/apache/spark/compare/master...koeninger:SPARK-17147
> https://issues.apache.org/jira/browse/SPARK-17147
>
> I’ll be testing out the patch on somewhat large scale stream processor
> tomorrow.
>
> CCing: Cody Koeninger
>
> Best,
> Justin
>
> On Jan 23, 2018, at 10:48 PM, namesuperwood  wrote:
>
> Hi all
>
> kafka version :  kafka_2.11-0.11.0.2
>spark version :  2.0.1
>
> A topic-partition "adn-tracking,15"  in kafka  who's   earliest offset is
> 1255644602 and  latest offset is 1271253441.
>
> While starting a spark streaming to process the data from the topic ,  we
> got a exception with "Got wrong record   even after seeking to offset
> 1266921577”.  [   (earliest offset) 1255644602 < 1266921577   <
> 1271253441 ( latest offset ) ]
>
> Finally, I found the following source code in class CachedKafkaCounsumer
> from spark-streaming. This is obviously due to the fact that the offset from
> consumer poll and the offset which the comsuner seek is not equal.
>
>
> Here is the “ CachedKafkaCounsumer.scala” code:
>
> def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
>
> logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested
> $offset") if (offset != nextOffset) {
>
> logInfo(s"Initial fetch for $groupId $topic $partition $offset")
> seek(offset) poll(timeout) }
>
> if (!buffer.hasNext()) { poll(timeout) }
> assert(buffer.hasNext(),
>   s"Failed to get records for $groupId $topic $partition $offset after
> polling for $timeout")
> var record = buffer.next()
>
> if (record.offset != offset) {
>   logInfo(s"Buffer miss for $groupId $topic $partition $offset")
>   seek(offset)
>   poll(timeout)
>   assert(buffer.hasNext(),
> s"Failed to get records for $groupId $topic $partition $offset after
> polling for $timeout")
>   record = buffer.next()
>   assert(record.offset == offset,
> s"Got wrong record for $groupId $topic $partition even after seeking to
> offset $offset")
> }
>
> nextOffset = offset + 1
> record
>
> }
>
> I reproduce this problem, and found out that offset from one
> topicAndPartition is uncontinuous in Kafka。I think this is a bug that needs
> to be repaired.
>
> I  implemented a simple project to use consumer to  seek offset 1266921577.
> But it return the offset 1266921578. Then while  seek to 1266921576, it
> return the 1266921576 exactly。
>
>
>
>
>
> There is the code:
>
> public class consumerDemo {
>
> public static void main(String[] argv){
>
> Properties props = new Properties();
> props.put("bootstrap.servers", "172.31.29.31:9091");
> props.put("group.id", "consumer-tutorial-demo");
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("value.deserializer", StringDeserializer.class.getName());
> KafkaConsumer consumer = new KafkaConsumer String>(props);
> TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
> Collection collection = new ArrayList();
> collection.add(tp); consumer.assign(collection);
> consumer.seek(tp, 1266921576); ConsumerRecords
> consumerRecords = consumer.poll(1);
> List> listR = consumerRecords.records(tp);
> Iterator > iter = listR.iterator();
> ConsumerRecord record = iter.next();
> System.out.println(" the next record " + record.offset() + " recode topic "
> + record.topic());
>}
>
> }
>
>
>
>
>
>
>
>
>
>
> wood.super
>
>

-
To 

Re: uncontinuous offset in kafka will cause the spark streamingfailure

2018-01-23 Thread namesuperwood
It seems this patch is not suitable for our problem。
https://github.com/apache/spark/compare/master...koeninger:SPARK-17147


wood.super


原始邮件
发件人:namesuperwoodnamesuperw...@gmail.com
收件人:Justin millerjustin.mil...@protectwise.com
抄送:useru...@spark.apache.org; Cody koeningerc...@koeninger.org
发送时间:2018年1月24日(周三) 14:45
主题:Re: uncontinuous offset in kafka will cause the spark streamingfailure


Yes. My spark streaming application works with uncompacted topic. I will check 
the patch.


wood.super


原始邮件
发件人:Justin millerjustin.mil...@protectwise.com
收件人:namesuperwoodnamesuperw...@gmail.com
抄送:useru...@spark.apache.org; Cody koeningerc...@koeninger.org
发送时间:2018年1月24日(周三) 14:23
主题:Re: uncontinuous offset in kafka will cause the spark streamingfailure


We appear to be kindred spirits, I’ve recently run into the same issue. Are you 
running compacted topics? I’ve run into this issue on non-compacted topics as 
well, it happens rarely but is still a pain. You might check out this patch and 
related spark streaming Kafka ticket:


https://github.com/apache/spark/compare/master...koeninger:SPARK-17147
https://issues.apache.org/jira/browse/SPARK-17147


I’ll be testing out the patch on somewhat large scale stream processor tomorrow.


CCing: Cody Koeninger


Best,
Justin



On Jan 23, 2018, at 10:48 PM, namesuperwood namesuperw...@gmail.com wrote:


Hi all


kafka version : kafka_2.11-0.11.0.2
   spark version : 2.0.1


A topic-partition "adn-tracking,15" in kafka who's earliest offset is1255644602 
andlatest offset is1271253441. 
While starting a spark streaming to process the data from the topic , we got a 
exception with "Got wrong record  even afterseeking to offset 1266921577”. 
[(earliest offset) 1255644602  1266921577   1271253441 ( latest offset ) ]
Finally, Ifound the following source code in class CachedKafkaCounsumer from 
spark-streaming. This is obviously due to the fact that the offset from 
consumer poll and the offset which the comsuner seek is not equal.


Here is the “ CachedKafkaCounsumer.scala” code:
def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested 
$offset") if (offset != nextOffset) {
logInfo(s"Initial fetch for $groupId $topic $partition $offset") seek(offset) 
poll(timeout) }
if (!buffer.hasNext()) { poll(timeout) } assert(buffer.hasNext(), s"Failed to 
get records for $groupId $topic $partition $offset after polling for $timeout") 
var record = buffer.next() if (record.offset != offset) { logInfo(s"Buffer miss 
for $groupId $topic $partition $offset") seek(offset) poll(timeout) 
assert(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition 
$offset after polling for $timeout") record = buffer.next() 
assert(record.offset == offset, s"Got wrong record for $groupId $topic 
$partition even after seeking to offset $offset") } nextOffset = offset + 1 
record
}
I reproduce this problem, and found out that offset from one topicAndPartition 
is uncontinuous in Kafka。I think this is a bug that needs to be repaired.
Iimplemented a simple project to use consumer to seek offset 1266921577. But it 
return the offset1266921578. Then while seek to1266921576, it return 
the1266921576exactly。
 




There is the code:
public class consumerDemo {
public static void main(String[] argv){ 
Properties props = new Properties();
props.put("bootstrap.servers", "172.31.29.31:9091");
props.put("group.id", "consumer-tutorial-demo");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumerString, String consumer = new KafkaConsumerString, String(props);
TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
CollectionTopicPartition collection = new ArrayListTopicPartition();
collection.add(tp); consumer.assign(collection);
consumer.seek(tp, 1266921576); ConsumerRecordsString, String consumerRecords = 
consumer.poll(1);
ListConsumerRecordString, String listR = consumerRecords.records(tp);
IteratorConsumerRecordString, String  iter = listR.iterator();
ConsumerRecordString, String record = iter.next();
System.out.println(" the next record " + record.offset() + " recode topic " + 
record.topic());
 }
}


















wood.super

Re: uncontinuous offset in kafka will cause the spark streamingfailure

2018-01-23 Thread namesuperwood
Yes. My spark streaming application works with uncompacted topic. I will check 
the patch.


wood.super


原始邮件
发件人:Justin millerjustin.mil...@protectwise.com
收件人:namesuperwoodnamesuperw...@gmail.com
抄送:useru...@spark.apache.org; Cody koeningerc...@koeninger.org
发送时间:2018年1月24日(周三) 14:23
主题:Re: uncontinuous offset in kafka will cause the spark streamingfailure


We appear to be kindred spirits, I’ve recently run into the same issue. Are you 
running compacted topics? I’ve run into this issue on non-compacted topics as 
well, it happens rarely but is still a pain. You might check out this patch and 
related spark streaming Kafka ticket:


https://github.com/apache/spark/compare/master...koeninger:SPARK-17147
https://issues.apache.org/jira/browse/SPARK-17147


I’ll be testing out the patch on somewhat large scale stream processor tomorrow.


CCing: Cody Koeninger


Best,
Justin



On Jan 23, 2018, at 10:48 PM, namesuperwood namesuperw...@gmail.com wrote:


Hi all


kafka version : kafka_2.11-0.11.0.2
   spark version : 2.0.1


A topic-partition "adn-tracking,15" in kafka who's earliest offset is1255644602 
andlatest offset is1271253441. 
While starting a spark streaming to process the data from the topic , we got a 
exception with "Got wrong record  even afterseeking to offset 1266921577”. 
[(earliest offset) 1255644602  1266921577   1271253441 ( latest offset ) ]
Finally, Ifound the following source code in class CachedKafkaCounsumer from 
spark-streaming. This is obviously due to the fact that the offset from 
consumer poll and the offset which the comsuner seek is not equal.


Here is the “ CachedKafkaCounsumer.scala” code:
def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = {
logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested 
$offset") if (offset != nextOffset) {
logInfo(s"Initial fetch for $groupId $topic $partition $offset") seek(offset) 
poll(timeout) }
if (!buffer.hasNext()) { poll(timeout) } assert(buffer.hasNext(), s"Failed to 
get records for $groupId $topic $partition $offset after polling for $timeout") 
var record = buffer.next() if (record.offset != offset) { logInfo(s"Buffer miss 
for $groupId $topic $partition $offset") seek(offset) poll(timeout) 
assert(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition 
$offset after polling for $timeout") record = buffer.next() 
assert(record.offset == offset, s"Got wrong record for $groupId $topic 
$partition even after seeking to offset $offset") } nextOffset = offset + 1 
record
}
I reproduce this problem, and found out that offset from one topicAndPartition 
is uncontinuous in Kafka。I think this is a bug that needs to be repaired.
Iimplemented a simple project to use consumer to seek offset 1266921577. But it 
return the offset1266921578. Then while seek to1266921576, it return 
the1266921576exactly。
 




There is the code:
public class consumerDemo {
public static void main(String[] argv){ 
Properties props = new Properties();
props.put("bootstrap.servers", "172.31.29.31:9091");
props.put("group.id", "consumer-tutorial-demo");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumerString, String consumer = new KafkaConsumerString, String(props);
TopicPartition tp = new TopicPartition("adn-tracking-click", 15);
CollectionTopicPartition collection = new ArrayListTopicPartition();
collection.add(tp); consumer.assign(collection);
consumer.seek(tp, 1266921576); ConsumerRecordsString, String consumerRecords = 
consumer.poll(1);
ListConsumerRecordString, String listR = consumerRecords.records(tp);
IteratorConsumerRecordString, String  iter = listR.iterator();
ConsumerRecordString, String record = iter.next();
System.out.println(" the next record " + record.offset() + " recode topic " + 
record.topic());
 }
}


















wood.super