Repository: spark
Updated Branches:
  refs/heads/master 30c4774f3 -> 26c1089c3


[SPARK-15748][SQL] Replace inefficient foldLeft() call with flatMap() in 
PartitionStatistics

`PartitionStatistics` uses `foldLeft` and list concatenation (`++`) to flatten 
an iterator of lists, but this is extremely inefficient compared to simply 
doing `flatMap`/`flatten` because it performs many unnecessary object 
allocations. Simply replacing this `foldLeft` by a `flatMap` results in decent 
performance gains when constructing PartitionStatistics instances for tables 
with many columns.

This patch fixes this and also makes two similar changes in MLlib and streaming 
to try to fix all known occurrences of this pattern.

Author: Josh Rosen <[email protected]>

Closes #13491 from JoshRosen/foldleft-to-flatmap.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26c1089c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26c1089c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26c1089c

Branch: refs/heads/master
Commit: 26c1089c37149061f838129bb53330ded68ff4c9
Parents: 30c4774
Author: Josh Rosen <[email protected]>
Authored: Sun Jun 5 16:51:00 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Sun Jun 5 16:51:00 2016 -0700

----------------------------------------------------------------------
 mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala    | 2 +-
 .../org/apache/spark/sql/execution/columnar/ColumnStats.scala    | 4 ++--
 .../main/scala/org/apache/spark/streaming/ui/StreamingPage.scala | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26c1089c/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala 
b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
index 94d1b83..8ed40c3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
@@ -422,7 +422,7 @@ private[ml] object MetaAlgorithmReadWrite {
       case rformModel: RFormulaModel => Array(rformModel.pipelineModel)
       case _: Params => Array()
     }
-    val subStageMaps = 
subStages.map(getUidMapImpl).foldLeft(List.empty[(String, Params)])(_ ++ _)
+    val subStageMaps = subStages.flatMap(getUidMapImpl)
     List((instance.uid, instance)) ++ subStageMaps
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/26c1089c/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
index 5d44769..470307b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnStats.scala
@@ -33,9 +33,9 @@ private[columnar] class ColumnStatisticsSchema(a: Attribute) 
extends Serializabl
 }
 
 private[columnar] class PartitionStatistics(tableSchema: Seq[Attribute]) 
extends Serializable {
-  val (forAttribute, schema) = {
+  val (forAttribute: AttributeMap[ColumnStatisticsSchema], schema: 
Seq[AttributeReference]) = {
     val allStats = tableSchema.map(a => a -> new ColumnStatisticsSchema(a))
-    (AttributeMap(allStats), 
allStats.map(_._2.schema).foldLeft(Seq.empty[Attribute])(_ ++ _))
+    (AttributeMap(allStats), allStats.flatMap(_._2.schema))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26c1089c/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index b97e24f..46cd309 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -396,11 +396,11 @@ private[ui] class StreamingPage(parent: StreamingTab)
       .map(_.ceil.toLong)
       .getOrElse(0L)
 
-    val content = 
listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).map {
+    val content: Seq[Node] = 
listener.receivedRecordRateWithBatchTime.toList.sortBy(_._1).flatMap {
       case (streamId, recordRates) =>
         generateInputDStreamRow(
           jsCollector, streamId, recordRates, minX, maxX, minY, maxYCalculated)
-    }.foldLeft[Seq[Node]](Nil)(_ ++ _)
+    }
 
     // scalastyle:off
     <table class="table table-bordered" style="width: auto">


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to