[ 
https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14593285#comment-14593285
 ] 

Dibyendu Bhattacharya commented on SPARK-8474:
----------------------------------------------

Yes , right ..

May be this is a false alarm ...I did not see any issue with the logic. As I 
see KafkaRDD keep pulling messages of chunk size fetch.message.max.bytes (1 MB) 
in every fetchBatch and it will keep doing till it reach the untilOffset...So I 
may be wrong here. . I got the issue once and after that not able to reproduce 
it . Shared the executor trace from that run , and I can see some 
OffsetOutOfRange issue. Not sure how that come as I launch the receiver very 
first time and starting from earliest offset. 

Just to mention , for all successive run , I never see the output like this as 
the shared log..

15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 0 
offsets 0 -> 2500
15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 1 
offsets 0 -> 2500
15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 2 
offsets 0 -> 2338

There must be some problem happened to get the offset ranges which seems to 
wrong I guess.  This topic is very old topic and offset can not start from Zero 
(0)..

> [STREAMING] Kafka DirectStream API stops receiving messages if collective 
> size of the messages specified in spark.streaming.kafka.maxRatePerPartition 
> exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-8474
>                 URL: https://issues.apache.org/jira/browse/SPARK-8474
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.4.0
>            Reporter: Dibyendu Bhattacharya
>            Priority: Critical
>
> The issue is , if in Kafka there are variable size messages ranging from few 
> KB to few hundred KBs , setting the rate limiting by number of messages can 
> leads to potential issue.
> Let say size of messages in Kafka are such that for default 
> fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be 
> pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition 
> number as say 2000. Now with this settings when Kafka RDD pulls messages for 
> its offset range , it will only pull 1000 messages (limited by size of the 
> pull in SimpleConsumer API) and can never be able to pull messages till the 
> desired untilOffset and in KafkaRDD it failed in this assert call..
> assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to