[ 
https://issues.apache.org/jira/browse/SPARK-24193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102458#comment-17102458
 ] 

Xianjin YE commented on SPARK-24193:
------------------------------------

I used `df.rdd.collect` intentionally to trigger the problem as `df.collect` is 
converted to `SparkPlan.executeTake` which is getting data correctly.

 

The problem can also be triggered with a slightly different version:
{code:java}
    val spark = SparkSession
      .builder
      .appName("Spark TopK test")
      .master("local-cluster[8, 1, 1024]")
      .getOrCreate()
    val temp1 = Utils.createTempDir()
    val data = spark.range(100000, 0, -1, 10).toDF("id").selectExpr("id + 1 as 
id")
    spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, 100)
    
data.orderBy("id").limit(200).write.mode("overwrite").parquet(temp1.toString)
    val topKInSort = spark.read.parquet(temp1.toString).collect()
    spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, Int.MaxValue)
    
data.orderBy("id").limit(200).write.mode("overwrite").parquet(temp1.toString)
    val topKInMemory = spark.read.parquet(temp1.toString).collect()
    println(topKInMemory.map(_.getLong(0)).mkString("[", ",", "]"))
    println(topKInSort.map(_.getLong(0)).mkString("[", ",", "]"))
    assert(topKInMemory sameElements topKInSort)

{code}
The real problem is that if I am going to accessing the ordered and limited 
data such as joining or writing to external table, the data is incorrect when 
falling back into CollectLimitExec.

> Sort by disk when number of limit is big in TakeOrderedAndProjectExec
> ---------------------------------------------------------------------
>
>                 Key: SPARK-24193
>                 URL: https://issues.apache.org/jira/browse/SPARK-24193
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Jin Xing
>            Assignee: Jin Xing
>            Priority: Major
>             Fix For: 2.4.0
>
>
> Physical plan of  "_select colA from t order by colB limit M_" is 
> _TakeOrderedAndProject_;
> Currently _TakeOrderedAndProject_ sorts data in memory, see 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L158
>  
> Shall we add a config -- if the number of limit (M) is too big, we can sort 
> by disk ? Thus memory issue can be resolved.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to