Repository: spark Updated Branches: refs/heads/master e6eb02df1 -> a92e095e7
[SPARK-21041][SQL] SparkSession.range should be consistent with SparkContext.range ## What changes were proposed in this pull request? This PR fixes the inconsistency in `SparkSession.range`. **BEFORE** ```scala scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect res2: Array[Long] = Array(9223372036854775804, 9223372036854775805, 9223372036854775806) ``` **AFTER** ```scala scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect res2: Array[Long] = Array() ``` ## How was this patch tested? Pass the Jenkins with newly added test cases. Author: Dongjoon Hyun <[email protected]> Closes #18257 from dongjoon-hyun/SPARK-21041. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a92e095e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a92e095e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a92e095e Branch: refs/heads/master Commit: a92e095e705155ea10c8311f7856b964d654626a Parents: e6eb02d Author: Dongjoon Hyun <[email protected]> Authored: Mon Jun 12 20:58:27 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Mon Jun 12 20:58:27 2017 +0800 ---------------------------------------------------------------------- .../spark/sql/execution/basicPhysicalOperators.scala | 10 +++++++--- .../scala/org/apache/spark/sql/DataFrameRangeSuite.scala | 11 +++++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a92e095e/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index f69a688..04c1303 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -21,7 +21,7 @@ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration.Duration import org.apache.spark.{InterruptibleIterator, SparkException, TaskContext} -import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD} +import org.apache.spark.rdd.{EmptyRDD, PartitionwiseSampledRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} @@ -347,8 +347,12 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) } override def inputRDDs(): Seq[RDD[InternalRow]] = { - sqlContext.sparkContext.parallelize(0 until numSlices, numSlices) - .map(i => InternalRow(i)) :: Nil + val rdd = if (start == end || (start < end ^ 0 < step)) { + new EmptyRDD[InternalRow](sqlContext.sparkContext) + } else { + sqlContext.sparkContext.parallelize(0 until numSlices, numSlices).map(i => InternalRow(i)) + } + rdd :: Nil } protected override def doProduce(ctx: CodegenContext): String = { http://git-wip-us.apache.org/repos/asf/spark/blob/a92e095e/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 7b49565..45afbd2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -191,6 +191,17 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall checkAnswer(sql("SELECT * FROM range(3)"), Row(0) :: Row(1) :: Row(2) :: Nil) } } + + test("SPARK-21041 SparkSession.range()'s behavior is inconsistent with SparkContext.range()") { + val start = java.lang.Long.MAX_VALUE - 3 + val end = java.lang.Long.MIN_VALUE + 2 + Seq("false", "true").foreach { value => + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> value) { + assert(spark.range(start, end, 1).collect.length == 0) + assert(spark.range(start, start, 1).collect.length == 0) + } + } + } } object DataFrameRangeSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
