Repository: spark Updated Branches: refs/heads/master f9705d461 -> e8f0e016e
[SQL] When creating partitioned table scan, explicitly create UnionRDD. Otherwise, it will cause stack overflow when there are many partitions. Author: Yin Huai <[email protected]> Closes #6162 from yhuai/partitionUnionedRDD and squashes the following commits: fa016d8 [Yin Huai] Explicitly create UnionRDD. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8f0e016 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8f0e016 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8f0e016 Branch: refs/heads/master Commit: e8f0e016eaf80a363796dd0a094291dcb3b35793 Parents: f9705d4 Author: Yin Huai <[email protected]> Authored: Fri May 15 12:04:26 2015 +0800 Committer: Cheng Lian <[email protected]> Committed: Fri May 15 12:04:26 2015 +0800 ---------------------------------------------------------------------- .../apache/spark/sql/sources/DataSourceStrategy.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e8f0e016/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala index a5410cd..ee099ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{UnionRDD, RDD} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ @@ -169,9 +169,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { scan.execute() } - val unionedRows = perPartitionRows.reduceOption(_ ++ _).getOrElse { - relation.sqlContext.emptyResult - } + val unionedRows = + if (perPartitionRows.length == 0) { + relation.sqlContext.emptyResult + } else { + new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows) + } createPhysicalRDD(logicalRelation.relation, output, unionedRows) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
