How about this:
1. we can make LocalLimit shuffle to mutiple partitions, i.e. create a new
partitioner to uniformly dispatch the data
class LimitUniformPartitioner(partitions: Int) extends Partitioner {
def numPartitions: Int = partitions
var num = 0
def getPartition(key: Any): Int = {
num = num + 1
num % partitions
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}
override def hashCode: Int = numPartitions
}
2. then in GlobalLimit, we only take the first
limit_number/num_of_shufflepartitions elements in each partition.
One issue left is how to decide shuffle partition number.
We can have a config of the maximum number of elements for each GlobalLimit
task to process,
then do a factorization to get a number most close to that config.
E.g. the config is 2000:
if limit=10000, 10000 = 2000 * 5, we shuffle to 5 partitions
if limit=9999, 9999 = 1111 * 9, we shuffle to 9 partitions
if limit is a prime number, we just fall back to single partition
best regards,
-zhenhua
-----邮件原件-----
发件人: Liang-Chi Hsieh [mailto:[email protected]]
发送时间: 2017年1月18日 15:48
收件人: [email protected]
主题: Re: Limit Query Performance Suggestion
Hi Sujith,
I saw your updated post. Seems it makes sense to me now.
If you use a very big limit number, the shuffling before `GlobalLimit` would be
a bottleneck for performance, of course, even it can eventually shuffle enough
data to the single partition.
Unlike `CollectLimit`, actually I think there is no reason `GlobalLimit` must
shuffle all limited data from all partitions to one single machine with respect
to query execution. In other words, I think we can avoid shuffling data in
`GlobalLimit`.
I have an idea to improve this and may update here later if I can make it work.
sujith71955 wrote
> Dear Liang,
>
> Thanks for your valuable feedback.
>
> There was a mistake in the previous post i corrected it, as you
> mentioned the `GlobalLimit` we will only take the required number of
> rows from the input iterator which really pulls data from local blocks
> and remote blocks.
> but if the limit value is very high >= 10000000, and when there will
> be a shuffle exchange happens between `GlobalLimit` and `LocalLimit`
> to retrieve data from all partitions to one partition, since the limit
> value is very large the performance bottleneck still exists.
>
> soon in next post i will publish a test report with sample data and
> also figuring out a solution for this problem.
>
> Please let me know for any clarifications or suggestions regarding
> this issue.
>
> Regards,
> Sujith
-----
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/Limit-Query-Performance-Suggestion-tp20570p20652.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]
---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]