[
https://issues.apache.org/jira/browse/SPARK-12002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Amit Ramesh updated SPARK-12002:
--------------------------------
Description:
SPARK-8389 added offsetRanges to Kafka direct streams. And SPARK-10122 fixed
the issue of not ending up with non-Kafka RDDs when chaining transforms to
Kafka RDDs. It appears that this issue remains for the case where a streaming
application using Kafka direct streams is initialized from the checkpoint
directory. The following is a representative example where everything works as
expected during the first run, but exceptions are thrown on a subsequent run
when the context is being initialized from the checkpoint directory.
{code:title=test_checkpoint.py|language=python}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def attach_kafka_metadata(kafka_rdd):
offset_ranges = kafka_rdd.offsetRanges()
return kafka_rdd
def create_context():
sc = SparkContext(appName='kafka-test')
ssc = StreamingContext(sc, 10)
ssc.checkpoint(CHECKPOINT_URI)
kafka_stream = KafkaUtils.createDirectStream(
ssc,
[TOPIC],
kafkaParams={
'metadata.broker.list': BROKERS,
},
)
kafka_stream.transform(attach_kafka_metadata).count().pprint()
return ssc
if __name__ == "__main__":
ssc = StreamingContext.getOrCreate(CHECKPOINT_URI, create_context)
ssc.start()
ssc.awaitTermination()
{code}
{code:title=Exception on resuming from checkpoint}
Traceback (most recent call last):
File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py",
line 62, in call
r = self.func(t, *rdds)
File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
line 344, in <lambda>
File "/home/spark/batch/test_checkpoint.py", line 12, in attach_kafka_metadata
offset_ranges = kafka_rdd.offsetRanges()
AttributeError: 'RDD' object has no attribute 'offsetRanges'
{code}
was:
SPARK-8389 added offsetRanges to Kafka direct streams. And SPARK-10122 fixed
the issue of not ending up with non-Kafka RDDs when chaining transforms to
Kafka RDDs. It appears that this issue remains for the case where a streaming
application using Kafka direct streams is initialized from the checkpoint
directory. The following is a representative example where everything works as
expected during the first run, but exceptions are thrown on a subsequent run
when the context is being initialized from the checkpoint directory.
{code:title=test-checkpoint.py|language=python}
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def attach_kafka_metadata(kafka_rdd):
offset_ranges = kafka_rdd.offsetRanges()
return kafka_rdd
def create_context():
sc = SparkContext(appName='kafka-test')
ssc = StreamingContext(sc, 10)
ssc.checkpoint(CHECKPOINT_URI)
kafka_stream = KafkaUtils.createDirectStream(
ssc,
[TOPIC],
kafkaParams={
'metadata.broker.list': BROKERS,
},
)
kafka_stream.transform(attach_kafka_metadata).count().pprint()
return ssc
if __name__ == "__main__":
ssc = StreamingContext.getOrCreate(CHECKPOINT_URI, create_context)
ssc.start()
ssc.awaitTermination()
{code}
{code:title=Exception on resuming from checkpoint}
Traceback (most recent call last):
File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py",
line 62, in call
r = self.func(t, *rdds)
File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
line 344, in <lambda>
File "/home/spark/ad_realtime/batch/test_checkpoint.py", line 12, in
attach_kafka_metadata
offset_ranges = kafka_rdd.offsetRanges()
AttributeError: 'RDD' object has no attribute 'offsetRanges'
{code}
> offsetRanges attribute missing in Kafka RDD when resuming from checkpoint
> -------------------------------------------------------------------------
>
> Key: SPARK-12002
> URL: https://issues.apache.org/jira/browse/SPARK-12002
> Project: Spark
> Issue Type: Bug
> Components: PySpark, Streaming
> Reporter: Amit Ramesh
>
> SPARK-8389 added offsetRanges to Kafka direct streams. And SPARK-10122 fixed
> the issue of not ending up with non-Kafka RDDs when chaining transforms to
> Kafka RDDs. It appears that this issue remains for the case where a streaming
> application using Kafka direct streams is initialized from the checkpoint
> directory. The following is a representative example where everything works
> as expected during the first run, but exceptions are thrown on a subsequent
> run when the context is being initialized from the checkpoint directory.
> {code:title=test_checkpoint.py|language=python}
> from pyspark import SparkContext
>
> from pyspark.streaming import StreamingContext
>
> from pyspark.streaming.kafka import KafkaUtils
>
> def attach_kafka_metadata(kafka_rdd):
>
> offset_ranges = kafka_rdd.offsetRanges()
>
>
>
> return kafka_rdd
>
>
>
>
>
> def create_context():
>
> sc = SparkContext(appName='kafka-test')
>
> ssc = StreamingContext(sc, 10)
>
> ssc.checkpoint(CHECKPOINT_URI)
>
>
>
> kafka_stream = KafkaUtils.createDirectStream(
>
> ssc,
>
> [TOPIC],
>
> kafkaParams={
>
> 'metadata.broker.list': BROKERS,
>
> },
>
> )
>
> kafka_stream.transform(attach_kafka_metadata).count().pprint()
>
>
>
> return ssc
>
>
>
>
>
> if __name__ == "__main__":
>
> ssc = StreamingContext.getOrCreate(CHECKPOINT_URI, create_context)
>
> ssc.start()
>
> ssc.awaitTermination()
> {code}
> {code:title=Exception on resuming from checkpoint}
> Traceback (most recent call last):
> File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/util.py",
> line 62, in call
> r = self.func(t, *rdds)
> File "/home/spark/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py",
> line 344, in <lambda>
> File "/home/spark/batch/test_checkpoint.py", line 12, in
> attach_kafka_metadata
> offset_ranges = kafka_rdd.offsetRanges()
> AttributeError: 'RDD' object has no attribute 'offsetRanges'
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
