[
https://issues.apache.org/jira/browse/SPARK-24193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17102285#comment-17102285
]
Xianjin YE commented on SPARK-24193:
------------------------------------
Hi, [[email protected]] [~cloud_fan] the fallback config has a correctness
issue, we may need to revert this change in Spark 2.4 and Spark 3.0
Way to reproduce:
{code:java}
val spark = SparkSession
.builder
.appName("Spark TopK test")
.master("local-cluster[8, 1, 1024]")
.getOrCreate() val data = spark.range(100000, 0, -1,
10).toDF("id").selectExpr("id + 1 as id")
spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, 100)
val topKInSort = data.orderBy("id").limit(200).rdd.collect()
spark.conf.set(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key, Int.MaxValue)
val topKInMemory = data.orderBy("id").limit(200).rdd.collect()
println(topKInMemory.map(_.getLong(0)).mkString("[", ",", "]"))
println(topKInSort.map(_.getLong(0)).mkString("[", ",", "]"))
assert(topKInMemory sameElements topKInSort)
{code}
The issue:
`CollectLimitExec`'s core idea is
`sortedRDD.mapPartitionsInternal(_.take(limit)).repartition(1).mapPartitionsInternal(_.take(limit))`
which doesn't guarantee the ordering semantics, so we cannot simply fallback
to CollectLimitExec.
Proposal to fix:
# revert the fallback logic
# implements CollectLimitExec similar with CollectTailExec, however it may not
suitable if the k in TopK is large enough.
# Another one is to do a similar calculation of CollectTailExec, however we
only collect record number of each partition and we can take the exact number
of records in each partition by leverage that information.
> Sort by disk when number of limit is big in TakeOrderedAndProjectExec
> ---------------------------------------------------------------------
>
> Key: SPARK-24193
> URL: https://issues.apache.org/jira/browse/SPARK-24193
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 2.3.0
> Reporter: Jin Xing
> Assignee: Jin Xing
> Priority: Major
> Fix For: 2.4.0
>
>
> Physical plan of "_select colA from t order by colB limit M_" is
> _TakeOrderedAndProject_;
> Currently _TakeOrderedAndProject_ sorts data in memory, see
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L158
>
> Shall we add a config -- if the number of limit (M) is too big, we can sort
> by disk ? Thus memory issue can be resolved.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]