This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e08c15b62712 [SPARK-53606][DOCS] Fix MapInPandas/MapInArrow examples with barrier e08c15b62712 is described below commit e08c15b62712303942284cb90d6a1b004f69652c Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Wed Sep 17 16:23:59 2025 +0800 [SPARK-53606][DOCS] Fix MapInPandas/MapInArrow examples with barrier ### What changes were proposed in this pull request? Fix MapInPandas/MapInArrow examples with barrier ### Why are the changes needed? the two examples never works: ``` In [4]: df.mapInPandas(filter_func, df.schema, barrier=True).collect() Out[4]: [Row(id=1, age=21)] In [5]: df.mapInPandas(filter_func, df.schema, barrier=True).show() 25/09/17 12:36:22 WARN DAGScheduler: Creating new stage failed due to exception - job: 5 org.apache.spark.scheduler.BarrierJobUnsupportedRDDChainException: [SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following pattern of RDD chain within a barrier stage: 1. Ancestor RDDs that have different number of partitions from the resulting RDD (e.g. union()/coalesce()/first()/take()/PartitionPruningRDD). A workaround for first()/take() can be barrierRdd.collect().head (scala) or barrierRdd.collect()[0] (python). 2. An RDD that depends on multiple barrier RDDs (e.g. barrierRdd1.zip(barrierRdd2)). at org.apache.spark.errors.SparkCoreErrors$.barrierStageWithRDDChainPatternError(SparkCoreErrors.scala:231) at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithRDDChainPattern(DAGScheduler.scala:506) at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:656) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1342) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3150) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3141) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3130) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:50) ``` ### Does this PR introduce _any_ user-facing change? Doc-only changes ### How was this patch tested? manually check ### Was this patch authored or co-authored using generative AI tooling? no Closes #52365 from zhengruifeng/fix_mapin_barrier. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/sql/dataframe.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index fca83c165b1d..ddbb7971e588 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -6405,12 +6405,8 @@ class DataFrame: barrier mode, it ensures all Python workers in the stage will be launched concurrently. - >>> df.mapInPandas(filter_func, df.schema, barrier=True).show() # doctest: +SKIP - +---+---+ - | id|age| - +---+---+ - | 1| 21| - +---+---+ + >>> df.mapInPandas(filter_func, df.schema, barrier=True).collect() # doctest: +SKIP + [Row(id=1, age=21)] See Also -------- @@ -6478,12 +6474,8 @@ class DataFrame: barrier mode, it ensures all Python workers in the stage will be launched concurrently. - >>> df.mapInArrow(filter_func, df.schema, barrier=True).show() # doctest: +SKIP - +---+---+ - | id|age| - +---+---+ - | 1| 21| - +---+---+ + >>> df.mapInArrow(filter_func, df.schema, barrier=True).collect() # doctest: +SKIP + [Row(id=1, age=21)] See Also -------- --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org