Amit Ramesh created SPARK-12002:
-----------------------------------

             Summary: offsetRanges attribute missing in Kafka RDD when resuming 
from checkpoint
                 Key: SPARK-12002
                 URL: https://issues.apache.org/jira/browse/SPARK-12002
             Project: Spark
          Issue Type: Bug
          Components: PySpark, Streaming
            Reporter: Amit Ramesh


SPARK-8389 added offsetRanges to Kafka direct streams. And SPARK-10122 fixed 
the issue of not ending up with non-Kafka RDDs when chaining transforms to 
Kafka RDDs. It appears that this issue remains for the case where a streaming 
application using Kafka direct streams is initialized from the checkpoint 
directory. The following is a representative example where everything works as 
expected during the first run, but exceptions are thrown on a subsequent run 
when the context is being initialized from the checkpoint directory.

{code:title=test-checkpoint.py|language=python}
from pyspark import SparkContext                                                
                                            
from pyspark.streaming import StreamingContext                                  
                                            
from pyspark.streaming.kafka import KafkaUtils                                  
                                            


def attach_kafka_metadata(kafka_rdd):                                           
                                            
    offset_ranges = kafka_rdd.offsetRanges()                                    
                                            
                                                                                
                                            
    return kafka_rdd                                                            
                                            
                                                                                
                                            
                                                                                
                                            
def create_context():                                                           
                                            
    sc = SparkContext(appName='kafka-test')                                     
                                            
    ssc = StreamingContext(sc, 10)                                              
                                            
    ssc.checkpoint(CHECKPOINT_URI)                                              
                                            
                                                                                
                                            
    kafka_stream = KafkaUtils.createDirectStream(                               
                                            
        ssc,                                                                    
                                            
        [TOPIC],                                                                
                                            
        kafkaParams={                                                           
                                            
            'metadata.broker.list': BROKERS,                                    
                                            
        },                                                                      
                                            
    )                                                                           
                                            
    kafka_stream.transform(attach_kafka_metadata).count().pprint()              
                                            
                                                                                
                                            
    return ssc                                                                  
                                            
                                                                                
                                            
                                                                                
                                            
if __name__ == "__main__":                                                      
                                            
    ssc = StreamingContext.getOrCreate(CHECKPOINT_URI, create_context)          
                                            
    ssc.start()                                                                 
                                            
    ssc.awaitTermination()
{code}

{code:title=Exception on resuming from checkpoint}
Traceback (most recent call last):
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", 
line 62, in call
    r = self.func(t, *rdds)
  File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", 
line 344, in <lambda>
  File "/home/spark/ad_realtime/batch/test_checkpoint.py", line 12, in 
attach_kafka_metadata
    offset_ranges = kafka_rdd.offsetRanges()
AttributeError: 'RDD' object has no attribute 'offsetRanges'
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to