[
https://issues.apache.org/jira/browse/SPARK-24587?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ryan Deak updated SPARK-24587:
------------------------------
Description:
*NOTE*: _This is likely a *very* impactful change, and likely only matters when
{{num}} is large, but without something like the proposed change, algorithms
based on distributed {{top-K}} don't scale very well._
h2. Description
{{[RDD.takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1432-L1437]}}
uses
{{[reduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1011]}}
to combine {{num}}\-sized {{BoundedPriorityQueue}} instances, where {{num}} is
the size of the returned {{Array}}. Consequently, even when the size of the
return value is small, relative to the driver memory, errors can occur.
An example error is:
{code}
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 28
tasks (8.1 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 29
tasks (8.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
...
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 160
tasks (46.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
{code}
It's clear from this message that although the resulting size of the result
will be approximately *0.3 GB* ({{46.4/160}}), the amount of driver memory
required to combine the results is more than {{46 GB}}.
h2. Proposed Solution
This amount of memory required can be dramatically reduced by using
{{[treeReduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1040]}}.
For instance replacing the {{else}} clause with:
{code:language=scala}
else {
import scala.math.{ceil, log, max}
val depth = max(1, ceil(log(mapRDDs.partitions.length) / log(2)).toInt)
mapRDDs.treeReduce(
(queue1, queue2) => queue1 ++= queue2,
depth
).toArray.sorted(ord)
}
{code}
This should require less than double the network communication but should scale
to much larger values of the {{num}} parameter without configuration changes or
beefier machines.
h2. Code Potentially Impacted
* ML Lib's
{{[CountVectorizer|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala#L232]}}
was:
*NOTE*: _This is likely a *very* impactful change, and likely only matters when
{{num}} is large, but without something like the proposed change, algorithms
based on distributed {{top-K}} don't scale very well._
h2. Description
{{[RDD.takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1432-L1437]}}
uses
{{[reduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1011]}}
to combine {{num}}\-sized {{BoundedPriorityQueue}} instances, where {{num}} is
the size of the returned {{Array}}. Consequently, even when the size of the
return value is small, relative to the driver memory, errors can occur.
An example error is:
{code}
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 28
tasks (8.1 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 29
tasks (8.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
...
18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of 160
tasks (46.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
{code}
It's clear from this message that although the resulting size of the result
will be approximately *0.3 GB* ({{46.4/160}}), the amount of driver memory
required to combine the results is more than {{46 GB}}.
h2. Proposed Solution
This amount of memory required can be dramatically reduced by using
{{[treeReduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1040]}}.
For instance replacing the {{else}} clause with:
{code:language=scala}
else {
import scala.math.{ceil, log, max}
val depth = max(2, ceil(log(mapRDDs.partitions.length) / log(2)).toInt)
mapRDDs.treeReduce(
(queue1, queue2) => queue1 ++= queue2,
depth
).toArray.sorted(ord)
}
{code}
This should require less than double the network communication but should scale
to much larger values of the {{num}} parameter without configuration changes or
beefier machines.
h2. Code Potentially Impacted
* ML Lib's
{{[CountVectorizer|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala#L232]}}
> RDD.takeOrdered uses reduce, pulling all partition data to the driver
> ---------------------------------------------------------------------
>
> Key: SPARK-24587
> URL: https://issues.apache.org/jira/browse/SPARK-24587
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 2.3.1
> Reporter: Ryan Deak
> Priority: Major
>
> *NOTE*: _This is likely a *very* impactful change, and likely only matters
> when {{num}} is large, but without something like the proposed change,
> algorithms based on distributed {{top-K}} don't scale very well._
> h2. Description
> {{[RDD.takeOrdered|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1432-L1437]}}
> uses
> {{[reduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1011]}}
> to combine {{num}}\-sized {{BoundedPriorityQueue}} instances, where {{num}}
> is the size of the returned {{Array}}. Consequently, even when the size of
> the return value is small, relative to the driver memory, errors can occur.
> An example error is:
> {code}
> 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of
> 28 tasks (8.1 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
> 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of
> 29 tasks (8.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
> ...
> 18/06/18 18:51:59 ERROR TaskSetManager: Total size of serialized results of
> 160 tasks (46.4 GB) is bigger than spark.driver.maxResultSize (8.0 GB)
> {code}
> It's clear from this message that although the resulting size of the result
> will be approximately *0.3 GB* ({{46.4/160}}), the amount of driver memory
> required to combine the results is more than {{46 GB}}.
> h2. Proposed Solution
> This amount of memory required can be dramatically reduced by using
> {{[treeReduce|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1040]}}.
> For instance replacing the {{else}} clause with:
> {code:language=scala}
> else {
> import scala.math.{ceil, log, max}
> val depth = max(1, ceil(log(mapRDDs.partitions.length) / log(2)).toInt)
> mapRDDs.treeReduce(
> (queue1, queue2) => queue1 ++= queue2,
> depth
> ).toArray.sorted(ord)
> }
> {code}
> This should require less than double the network communication but should
> scale to much larger values of the {{num}} parameter without configuration
> changes or beefier machines.
> h2. Code Potentially Impacted
> * ML Lib's
> {{[CountVectorizer|https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala#L232]}}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]