Jeffrey(Xilang) Yan created SPARK-27894:
-------------------------------------------

             Summary: PySpark streaming transform RDD join not works when 
checkpoint enabled
                 Key: SPARK-27894
                 URL: https://issues.apache.org/jira/browse/SPARK-27894
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.4.0
            Reporter: Jeffrey(Xilang) Yan


In PySpark Steaming, if checkpoint enabled and there is a transform-join 
operation, the error thrown.

{{sc=SparkContext(appName='xxxx') }}

{{sc.setLogLevel("WARN") }}

{{ssc=StreamingContext(sc,10) }}

{{ssc.checkpoint("hdfs://xxxx/test") }}

{{kafka_bootstrap_servers="xxxx" }}

{{topics = ['xxxx', 'xxxx'] }}

{{doc_info = sc.parallelize(((1, 2), (4, 5), (7, 8), (10, 11)))}}

{{ kvds=KafkaUtils.createDirectStream(ssc, topics, 
kafkaParams=\{"metadata.broker.list": kafka_bootstrap_servers}) }}

{{line=kvds.map(lambda x:(1,2)) }}

{{line.transform(lambda rdd:rdd.join(doc_info)).pprint(10) }}

{{ssc.start() }}

{{ssc.awaitTermination() }}

Error details:

{{PicklingError: Could not serialize object: Exception: It appears that you are 
attempting to broadcast an RDD or reference an RDD from an action or 
transformation. RDD transformations and actions can only be invoked by the 
driver, not inside of other transformations; for example, rdd1.map(lambda x: 
rdd2.values.count() * x) is invalid because the values transformation and count 
action cannot be performed inside of the rdd1.map transformation. For more 
information, see SPARK-5063. }}

The similar code works great in Scala. And if we remove any of

{{ssc.checkpoint("hdfs://xxxx/test") }}

or

{{line.transform(lambda rdd:rdd.join(doc_info)) }}

There is no error either.

 

It seems that when checkpoint is enabled, pyspark will serialize transform 
lambda, and then the RDD used by lambda, while RDD cannot be serialize so the 
error prompted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to