[
https://issues.apache.org/jira/browse/SPARK-10122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14704674#comment-14704674
]
Amit Ramesh edited comment on SPARK-10122 at 8/20/15 10:51 AM:
---------------------------------------------------------------
[~srowen] as you can see in the example, offsetRanges() is being applied to the
initial RDD as part of the transform operation. And the code works fine if the
line 'kafka_stream.transform(attach_kafka_metadata).count().pprint()' is
changed to 'kafka_stream.transform(attach_kafka_metadata).pprint()'.
was (Author: aramesh):
[~srowen] as you can see in the example, offsetRanges() is being applied to the
initial RDD as part of the transform operation. And the code works file if the
line 'kafka_stream.transform(attach_kafka_metadata).count().pprint()' is
changed to 'kafka_stream.transform(attach_kafka_metadata).pprint()'.
> AttributeError: 'RDD' object has no attribute 'offsetRanges'
> ------------------------------------------------------------
>
> Key: SPARK-10122
> URL: https://issues.apache.org/jira/browse/SPARK-10122
> Project: Spark
> Issue Type: Bug
> Components: PySpark, Streaming
> Reporter: Amit Ramesh
> Priority: Critical
> Labels: kafka
>
> SPARK-8389 added the offsetRanges interface to Kafka direct streams. This
> however appears to break when chaining operations after a transform
> operation. Following is example code that would result in an error (stack
> trace below). Note that if the 'count()' operation is taken out of the
> example code then this error does not occur anymore, and the Kafka data is
> printed.
> {code:title=kafka_test.py|collapse=true}
> from pyspark import SparkContext
> from pyspark.streaming import StreamingContext
> from pyspark.streaming.kafka import KafkaUtils
> def attach_kafka_metadata(kafka_rdd):
> offset_ranges = kafka_rdd.offsetRanges()
> return kafka_rdd
> if __name__ == "__main__":
> sc = SparkContext(appName='kafka-test')
> ssc = StreamingContext(sc, 10)
> kafka_stream = KafkaUtils.createDirectStream(
> ssc,
> [TOPIC],
> kafkaParams={
> 'metadata.broker.list': BROKERS,
> },
> )
> kafka_stream.transform(attach_kafka_metadata).count().pprint()
> ssc.start()
> ssc.awaitTermination()
> {code}
> {code:title=Stack trace|collapse=true}
> Traceback (most recent call last):
> File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py",
> line 62, in call
> r = self.func(t, *rdds)
> File
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line
> 616, in <lambda>
> self.func = lambda t, rdd: func(t, prev_func(t, rdd))
> File
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line
> 616, in <lambda>
> self.func = lambda t, rdd: func(t, prev_func(t, rdd))
> File
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line
> 616, in <lambda>
> self.func = lambda t, rdd: func(t, prev_func(t, rdd))
> File
> "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line
> 616, in <lambda>
> self.func = lambda t, rdd: func(t, prev_func(t, rdd))
> File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
> line 332, in <lambda>
> func = lambda t, rdd: oldfunc(rdd)
> File "/home/spark/ad_realtime/batch/kafka_test.py", line 7, in
> attach_kafka_metadata
> offset_ranges = kafka_rdd.offsetRanges()
> AttributeError: 'RDD' object has no attribute 'offsetRanges'
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]