Liang-Chi Hsieh commented on SPARK-23970:

I think the document of {{coalesce}} might answer this, let me quote it:

   * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
   * this may result in your computation taking place on fewer nodes than
   * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
   * you can call repartition. This will add a shuffle step, but means the
   * current upstream partitions will be executed in parallel (per whatever
   * the current partitioning is).

When you do a {{coalesce}}, you make your computation taking place on certain 
number of partitions.


> pyspark - simple filter/select doesn't use all tasks when coalesce is set
> -------------------------------------------------------------------------
>                 Key: SPARK-23970
>                 URL: https://issues.apache.org/jira/browse/SPARK-23970
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.2.0, 2.2.1
>            Reporter: Matthew Anthony
>            Priority: Major
> Running in (py)spark 2.2. 
> Marking this as PySpark, but have not confirmed whether this is Spark-wide; 
> I've observed it in pyspark which is my preferred API.
> {code:java}
> df = spark.sql(
> """
> select <redacted>
> from <inputtbl>
> where <conditions
> """
> )
> df.coalesce(32).write.parquet(...){code}
> The above code will only attempt to use 32 tasks to read and process all of 
> the original input data. This compares to 
> {code:java}
> df = spark.sql(
> """
> select <redacted>
> from <inputtbl>
> where <conditions
> """
> ).cache()
> df.count()
> df.coalesce(32).write.parquet(...){code}
> where this will use the full complement of tasks available to the cluster to 
> do the initial filter, with a subsequent shuffle to coalesce and write. The 
> latter execution path is way more efficient, particularly at large volumes 
> where filtering will remove most records and should be the default. Note that 
> in the real setting in which I am running this, I'm operating a 20 node 
> cluster with 16 cores and 56gb RAM per machine, and processing well over a TB 
> of raw data in <inputtbl>. The scale of the task I am testing on generates 
> approximately 300,000 read tasks in the latter version of the code when not 
> constrained by the former's execution plan.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to