[ 
https://issues.apache.org/jira/browse/SPARK-27894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeffrey(Xilang) Yan updated SPARK-27894:
----------------------------------------
    Description: 
In PySpark Steaming, if checkpoint enabled and there is a transform-join 
operation, the error thrown.
{code:java}
sc=SparkContext(appName='xxxx')
sc.setLogLevel("WARN")
ssc=StreamingContext(sc,10)
ssc.checkpoint("hdfs://xxxx/test")

kafka_bootstrap_servers="xxxx"
topics = ['xxxx', 'xxxx']

doc_info = sc.parallelize(((1, 2), (4, 5), (7, 8), (10, 11)))
kvds=KafkaUtils.createDirectStream(ssc, topics, 
kafkaParams={"metadata.broker.list": kafka_bootstrap_servers})

line=kvds.map(lambda x:(1,2))

line.transform(lambda rdd:rdd.join(doc_info)).pprint(10)

ssc.start()
ssc.awaitTermination()
{code}
 

Error details:
{code:java}
PicklingError: Could not serialize object: Exception: It appears that you are 
attempting to broadcast an RDD or reference an RDD from an action or 
transformation. RDD transformations and actions can only be invoked by the 
driver, not inside of other transformations; for example, rdd1.map(lambda x: 
rdd2.values.count() * x) is invalid because the values transformation and count 
action cannot be performed inside of the rdd1.map transformation. For more 
information, see SPARK-5063.
{code}
The similar code works great in Scala. And if we remove any of
{code:java}
ssc.checkpoint("hdfs://xxxx/test") 
{code}
or
{code:java}
line.transform(lambda rdd:rdd.join(doc_info))
{code}
There is no error either.

 

It seems that when checkpoint is enabled, pyspark will serialize transform 
lambda, and then the RDD used by lambda, while RDD cannot be serialize so the 
error prompted.

  was:
In PySpark Steaming, if checkpoint enabled and there is a transform-join 
operation, the error thrown.
{code:java}
sc=SparkContext(appName='xxxx')
sc.setLogLevel("WARN")
ssc=StreamingContext(sc,10)
ssc.checkpoint("hdfs://xxxx/test")

kafka_bootstrap_servers="xxxx"
topics = ['xxxx', 'xxxx']

doc_info = sc.parallelize(((1, 2), (4, 5), (7, 8), (10, 11)))
kvds=KafkaUtils.createDirectStream(ssc, topics, 
kafkaParams={"metadata.broker.list": kafka_bootstrap_servers})

line=kvds.map(lambda x:(1,2))

line.transform(lambda rdd:rdd.join(doc_info)).pprint(10)

ssc.start()
ssc.awaitTermination()
{code}
 

Error details:

{{PicklingError: Could not serialize object: Exception: It appears that you are 
attempting to broadcast an RDD or reference an RDD from an action or 
transformation. RDD transformations and actions can only be invoked by the 
driver, not inside of other transformations; for example, rdd1.map(lambda x: 
rdd2.values.count() * x) is invalid because the values transformation and count 
action cannot be performed inside of the rdd1.map transformation. For more 
information, see SPARK-5063. }}

The similar code works great in Scala. And if we remove any of
{code:java}
ssc.checkpoint("hdfs://xxxx/test") 
{code}
or
{code:java}
line.transform(lambda rdd:rdd.join(doc_info))
{code}
There is no error either.

 

It seems that when checkpoint is enabled, pyspark will serialize transform 
lambda, and then the RDD used by lambda, while RDD cannot be serialize so the 
error prompted.


> PySpark streaming transform RDD join not works when checkpoint enabled
> ----------------------------------------------------------------------
>
>                 Key: SPARK-27894
>                 URL: https://issues.apache.org/jira/browse/SPARK-27894
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.0
>            Reporter: Jeffrey(Xilang) Yan
>            Priority: Major
>
> In PySpark Steaming, if checkpoint enabled and there is a transform-join 
> operation, the error thrown.
> {code:java}
> sc=SparkContext(appName='xxxx')
> sc.setLogLevel("WARN")
> ssc=StreamingContext(sc,10)
> ssc.checkpoint("hdfs://xxxx/test")
> kafka_bootstrap_servers="xxxx"
> topics = ['xxxx', 'xxxx']
> doc_info = sc.parallelize(((1, 2), (4, 5), (7, 8), (10, 11)))
> kvds=KafkaUtils.createDirectStream(ssc, topics, 
> kafkaParams={"metadata.broker.list": kafka_bootstrap_servers})
> line=kvds.map(lambda x:(1,2))
> line.transform(lambda rdd:rdd.join(doc_info)).pprint(10)
> ssc.start()
> ssc.awaitTermination()
> {code}
>  
> Error details:
> {code:java}
> PicklingError: Could not serialize object: Exception: It appears that you are 
> attempting to broadcast an RDD or reference an RDD from an action or 
> transformation. RDD transformations and actions can only be invoked by the 
> driver, not inside of other transformations; for example, rdd1.map(lambda x: 
> rdd2.values.count() * x) is invalid because the values transformation and 
> count action cannot be performed inside of the rdd1.map transformation. For 
> more information, see SPARK-5063.
> {code}
> The similar code works great in Scala. And if we remove any of
> {code:java}
> ssc.checkpoint("hdfs://xxxx/test") 
> {code}
> or
> {code:java}
> line.transform(lambda rdd:rdd.join(doc_info))
> {code}
> There is no error either.
>  
> It seems that when checkpoint is enabled, pyspark will serialize transform 
> lambda, and then the RDD used by lambda, while RDD cannot be serialize so the 
> error prompted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to