[ 
https://issues.apache.org/jira/browse/SPARK-12002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu resolved SPARK-12002.
----------------------------------
       Resolution: Fixed
         Assignee: Saisai Shao
    Fix Version/s: 1.6.0

> offsetRanges attribute missing in Kafka RDD when resuming from checkpoint
> -------------------------------------------------------------------------
>
>                 Key: SPARK-12002
>                 URL: https://issues.apache.org/jira/browse/SPARK-12002
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Streaming
>            Reporter: Amit Ramesh
>            Assignee: Saisai Shao
>             Fix For: 1.6.0
>
>
> SPARK-8389 added offsetRanges to Kafka direct streams. And SPARK-10122 fixed 
> the issue of not ending up with non-Kafka RDDs when chaining transforms to 
> Kafka RDDs. It appears that this issue remains for the case where a streaming 
> application using Kafka direct streams is initialized from the checkpoint 
> directory. The following is a representative example where everything works 
> as expected during the first run, but exceptions are thrown on a subsequent 
> run when the context is being initialized from the checkpoint directory.
> {code:title=test_checkpoint.py|language=python}
> from pyspark import SparkContext                                              
>                                               
> from pyspark.streaming import StreamingContext                                
>                                               
> from pyspark.streaming.kafka import KafkaUtils                                
>                                               
> def attach_kafka_metadata(kafka_rdd):                                         
>                                               
>     offset_ranges = kafka_rdd.offsetRanges()                                  
>                                               
>                                                                               
>                                               
>     return kafka_rdd                                                          
>                                               
>                                                                               
>                                               
>                                                                               
>                                               
> def create_context():                                                         
>                                               
>     sc = SparkContext(appName='kafka-test')                                   
>                                               
>     ssc = StreamingContext(sc, 10)                                            
>                                               
>     ssc.checkpoint(CHECKPOINT_URI)                                            
>                                               
>                                                                               
>                                               
>     kafka_stream = KafkaUtils.createDirectStream(                             
>                                               
>         ssc,                                                                  
>                                               
>         [TOPIC],                                                              
>                                               
>         kafkaParams={                                                         
>                                               
>             'metadata.broker.list': BROKERS,                                  
>                                               
>         },                                                                    
>                                               
>     )                                                                         
>                                               
>     kafka_stream.transform(attach_kafka_metadata).count().pprint()            
>                                               
>                                                                               
>                                               
>     return ssc                                                                
>                                               
>                                                                               
>                                               
>                                                                               
>                                               
> if __name__ == "__main__":                                                    
>                                               
>     ssc = StreamingContext.getOrCreate(CHECKPOINT_URI, create_context)        
>                                               
>     ssc.start()                                                               
>                                               
>     ssc.awaitTermination()
> {code}
> {code:title=Exception on resuming from checkpoint}
> Traceback (most recent call last):
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", 
> line 62, in call
>     r = self.func(t, *rdds)
>   File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", 
> line 344, in <lambda>
>   File "/home/spark/batch/test_checkpoint.py", line 12, in 
> attach_kafka_metadata
>     offset_ranges = kafka_rdd.offsetRanges()
> AttributeError: 'RDD' object has no attribute 'offsetRanges'
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to