Repository: spark
Updated Branches:
  refs/heads/branch-2.0 6dcc1a3f0 -> 01a4d69f3


[SPARK-17162] Range does not support SQL generation

## What changes were proposed in this pull request?

The range operator previously didn't support SQL generation, which made it not 
possible to use in views.

## How was this patch tested?

Unit tests.

cc hvanhovell

Author: Eric Liang <e...@databricks.com>

Closes #14724 from ericl/spark-17162.

(cherry picked from commit 84770b59f773f132073cd2af4204957fc2d7bf35)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01a4d69f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01a4d69f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01a4d69f

Branch: refs/heads/branch-2.0
Commit: 01a4d69f309a1cc8d370ce9f85e6a4f31b6db3b8
Parents: 6dcc1a3
Author: Eric Liang <e...@databricks.com>
Authored: Mon Aug 22 15:48:35 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Aug 22 15:48:43 2016 -0700

----------------------------------------------------------------------
 .../analysis/ResolveTableValuedFunctions.scala  | 11 ++++------
 .../plans/logical/basicLogicalOperators.scala   | 21 +++++++++++++-------
 .../apache/spark/sql/catalyst/SQLBuilder.scala  |  3 +++
 .../sql/execution/basicPhysicalOperators.scala  |  2 +-
 .../spark/sql/execution/command/views.scala     |  3 +--
 sql/hive/src/test/resources/sqlgen/range.sql    |  4 ++++
 .../test/resources/sqlgen/range_with_splits.sql |  4 ++++
 .../sql/catalyst/LogicalPlanToSQLSuite.scala    | 14 ++++++++++++-
 8 files changed, 44 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01a4d69f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
index 7fdf7fa..6b3bb68 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
@@ -28,9 +28,6 @@ import org.apache.spark.sql.types.{DataType, IntegerType, 
LongType}
  * Rule that resolves table-valued function references.
  */
 object ResolveTableValuedFunctions extends Rule[LogicalPlan] {
-  private lazy val defaultParallelism =
-    SparkContext.getOrCreate(new SparkConf(false)).defaultParallelism
-
   /**
    * List of argument names and their types, used to declare a function.
    */
@@ -84,25 +81,25 @@ object ResolveTableValuedFunctions extends 
Rule[LogicalPlan] {
     "range" -> Map(
       /* range(end) */
       tvf("end" -> LongType) { case Seq(end: Long) =>
-        Range(0, end, 1, defaultParallelism)
+        Range(0, end, 1, None)
       },
 
       /* range(start, end) */
       tvf("start" -> LongType, "end" -> LongType) { case Seq(start: Long, end: 
Long) =>
-        Range(start, end, 1, defaultParallelism)
+        Range(start, end, 1, None)
       },
 
       /* range(start, end, step) */
       tvf("start" -> LongType, "end" -> LongType, "step" -> LongType) {
         case Seq(start: Long, end: Long, step: Long) =>
-          Range(start, end, step, defaultParallelism)
+          Range(start, end, step, None)
       },
 
       /* range(start, end, step, numPartitions) */
       tvf("start" -> LongType, "end" -> LongType, "step" -> LongType,
           "numPartitions" -> IntegerType) {
         case Seq(start: Long, end: Long, step: Long, numPartitions: Int) =>
-          Range(start, end, step, numPartitions)
+          Range(start, end, step, Some(numPartitions))
       })
   )
 

http://git-wip-us.apache.org/repos/asf/spark/blob/01a4d69f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index eb612c4..07e39b0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -422,17 +422,20 @@ case class Sort(
 
 /** Factory for constructing new `Range` nodes. */
 object Range {
-  def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
+  def apply(start: Long, end: Long, step: Long, numSlices: Option[Int]): Range 
= {
     val output = StructType(StructField("id", LongType, nullable = false) :: 
Nil).toAttributes
     new Range(start, end, step, numSlices, output)
   }
+  def apply(start: Long, end: Long, step: Long, numSlices: Int): Range = {
+    Range(start, end, step, Some(numSlices))
+  }
 }
 
 case class Range(
     start: Long,
     end: Long,
     step: Long,
-    numSlices: Int,
+    numSlices: Option[Int],
     output: Seq[Attribute])
   extends LeafNode with MultiInstanceRelation {
 
@@ -449,6 +452,14 @@ case class Range(
     }
   }
 
+  def toSQL(): String = {
+    if (numSlices.isDefined) {
+      s"SELECT id AS `${output.head.name}` FROM range($start, $end, $step, 
${numSlices.get})"
+    } else {
+      s"SELECT id AS `${output.head.name}` FROM range($start, $end, $step)"
+    }
+  }
+
   override def newInstance(): Range = copy(output = 
output.map(_.newInstance()))
 
   override lazy val statistics: Statistics = {
@@ -457,11 +468,7 @@ case class Range(
   }
 
   override def simpleString: String = {
-    if (step == 1) {
-      s"Range ($start, $end, splits=$numSlices)"
-    } else {
-      s"Range ($start, $end, step=$step, splits=$numSlices)"
-    }
+    s"Range ($start, $end, step=$step, splits=$numSlices)"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/01a4d69f/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
index 9ac3480..5e0263e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
@@ -208,6 +208,9 @@ class SQLBuilder private (
     case p: LocalRelation =>
       p.toSQL(newSubqueryName())
 
+    case p: Range =>
+      p.toSQL()
+
     case OneRowRelation =>
       ""
 

http://git-wip-us.apache.org/repos/asf/spark/blob/01a4d69f/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index e6f7081..90bf817 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -312,7 +312,7 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
 
   def start: Long = range.start
   def step: Long = range.step
-  def numSlices: Int = range.numSlices
+  def numSlices: Int = 
range.numSlices.getOrElse(sparkContext.defaultParallelism)
   def numElements: BigInt = range.numElements
 
   override val output: Seq[Attribute] = range.output

http://git-wip-us.apache.org/repos/asf/spark/blob/01a4d69f/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 6533d79..9e263d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -191,8 +191,7 @@ case class CreateViewCommand(
       sparkSession.sql(viewSQL).queryExecution.assertAnalyzed()
     } catch {
       case NonFatal(e) =>
-        throw new RuntimeException(
-          "Failed to analyze the canonicalized SQL. It is possible there is a 
bug in Spark.", e)
+        throw new RuntimeException(s"Failed to analyze the canonicalized SQL: 
${viewSQL}", e)
     }
 
     val viewSchema: Seq[CatalogColumn] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/01a4d69f/sql/hive/src/test/resources/sqlgen/range.sql
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/sqlgen/range.sql 
b/sql/hive/src/test/resources/sqlgen/range.sql
new file mode 100644
index 0000000..53c72ea
--- /dev/null
+++ b/sql/hive/src/test/resources/sqlgen/range.sql
@@ -0,0 +1,4 @@
+-- This file is automatically generated by LogicalPlanToSQLSuite.
+select * from range(100)
+--------------------------------------------------------------------------------
+SELECT `gen_attr_0` AS `id` FROM (SELECT `gen_attr_0` FROM (SELECT id AS 
`gen_attr_0` FROM range(0, 100, 1)) AS gen_subquery_0) AS gen_subquery_1

http://git-wip-us.apache.org/repos/asf/spark/blob/01a4d69f/sql/hive/src/test/resources/sqlgen/range_with_splits.sql
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/resources/sqlgen/range_with_splits.sql 
b/sql/hive/src/test/resources/sqlgen/range_with_splits.sql
new file mode 100644
index 0000000..83d637d
--- /dev/null
+++ b/sql/hive/src/test/resources/sqlgen/range_with_splits.sql
@@ -0,0 +1,4 @@
+-- This file is automatically generated by LogicalPlanToSQLSuite.
+select * from range(1, 100, 20, 10)
+--------------------------------------------------------------------------------
+SELECT `gen_attr_0` AS `id` FROM (SELECT `gen_attr_0` FROM (SELECT id AS 
`gen_attr_0` FROM range(1, 100, 20, 10)) AS gen_subquery_0) AS gen_subquery_1

http://git-wip-us.apache.org/repos/asf/spark/blob/01a4d69f/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
index 742b065..9c6da6a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
@@ -23,7 +23,10 @@ import java.nio.file.{Files, NoSuchFileException, Paths}
 import scala.util.control.NonFatal
 
 import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -180,7 +183,11 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with 
SQLTestUtils {
     }
 
     test("Test should fail if the SQL query cannot be regenerated") {
-      
spark.range(10).createOrReplaceTempView("not_sql_gen_supported_table_so_far")
+      case class Unsupported() extends LeafNode with MultiInstanceRelation {
+        override def newInstance(): Unsupported = copy()
+        override def output: Seq[Attribute] = Nil
+      }
+      
Unsupported().createOrReplaceTempView("not_sql_gen_supported_table_so_far")
       sql("select * from not_sql_gen_supported_table_so_far")
       val m3 = intercept[org.scalatest.exceptions.TestFailedException] {
         checkSQL("select * from not_sql_gen_supported_table_so_far", "in")
@@ -196,6 +203,11 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with 
SQLTestUtils {
     }
   }
 
+  test("range") {
+    checkSQL("select * from range(100)", "range")
+    checkSQL("select * from range(1, 100, 20, 10)", "range_with_splits")
+  }
+
   test("in") {
     checkSQL("SELECT id FROM parquet_t0 WHERE id IN (1, 2, 3)", "in")
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to