Repository: spark
Updated Branches:
  refs/heads/master 4df65184b -> b2dfa8495


[SPARK-14668][SQL] Move CurrentDatabase to Catalyst

## What changes were proposed in this pull request?

This PR moves `CurrentDatabase` from sql/hive package to sql/catalyst. It also 
adds the function description, which looks like the following.

```
scala> sqlContext.sql("describe function extended 
current_database").collect.foreach(println)
[Function: current_database]
[Class: org.apache.spark.sql.execution.command.CurrentDatabase]
[Usage: current_database() - Returns the current database.]
[Extended Usage:
> SELECT current_database()]
```

## How was this patch tested?
Existing tests

Author: Yin Huai <[email protected]>

Closes #12424 from yhuai/SPARK-14668.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2dfa849
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2dfa849
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2dfa849

Branch: refs/heads/master
Commit: b2dfa849599843269a43e6e0f2ab8c539dfc32b6
Parents: 4df6518
Author: Yin Huai <[email protected]>
Authored: Fri Apr 15 17:48:41 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Fri Apr 15 17:48:41 2016 -0700

----------------------------------------------------------------------
 .../catalyst/analysis/FunctionRegistry.scala    |  1 +
 .../spark/sql/catalyst/expressions/misc.scala   | 12 ++++++++++
 .../sql/catalyst/optimizer/Optimizer.scala      | 24 +++++++++++++++++---
 .../optimizer/OptimizerExtendableSuite.scala    |  7 +++++-
 .../spark/sql/execution/SparkOptimizer.scala    |  7 +++++-
 .../spark/sql/internal/SessionState.scala       |  2 +-
 .../org/apache/spark/sql/hive/HiveContext.scala | 18 ---------------
 7 files changed, 47 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index f2abf13..028463e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -337,6 +337,7 @@ object FunctionRegistry {
     expression[SparkPartitionID]("spark_partition_id"),
     expression[InputFileName]("input_file_name"),
     expression[MonotonicallyIncreasingID]("monotonically_increasing_id"),
+    expression[CurrentDatabase]("current_database"),
 
     // grouping sets
     expression[Cube]("cube"),

http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 4bd918e..113fc86 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -512,3 +512,15 @@ object XxHash64Function extends InterpretedHashFunction {
     XXH64.hashUnsafeBytes(base, offset, len, seed)
   }
 }
+
+/**
+ * Returns the current database of the SessionCatalog.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_() - Returns the current database.",
+  extended = "> SELECT _FUNC_()")
+private[sql] case class CurrentDatabase() extends LeafExpression with 
Unevaluable {
+  override def dataType: DataType = StringType
+  override def foldable: Boolean = true
+  override def nullable: Boolean = false
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f5172b2..bb68ef8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -20,7 +20,9 @@ package org.apache.spark.sql.catalyst.optimizer
 import scala.annotation.tailrec
 import scala.collection.immutable.HashSet
 
-import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, 
DistinctAggregationRewriter, EliminateSubqueryAliases}
+import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf}
+import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, 
DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
@@ -34,7 +36,9 @@ import org.apache.spark.sql.types._
  * Abstract class all optimizers should inherit of, contains the standard 
batches (extending
  * Optimizers can override this.
  */
-abstract class Optimizer extends RuleExecutor[LogicalPlan] {
+abstract class Optimizer(
+    conf: CatalystConf,
+    sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] {
   def batches: Seq[Batch] = {
     // Technically some of the rules in Finish Analysis are not optimizer 
rules and belong more
     // in the analyzer, because they are needed for correctness (e.g. 
ComputeCurrentTime).
@@ -43,6 +47,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
     Batch("Finish Analysis", Once,
       EliminateSubqueryAliases,
       ComputeCurrentTime,
+      GetCurrentDatabase(sessionCatalog),
       DistinctAggregationRewriter) ::
     
//////////////////////////////////////////////////////////////////////////////////////////
     // Optimizer rules start here
@@ -117,7 +122,10 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] 
{
  * To ensure extendability, we leave the standard rules in the abstract 
optimizer rules, while
  * specific rules go to the subclasses
  */
-object DefaultOptimizer extends Optimizer
+object DefaultOptimizer
+  extends Optimizer(
+    EmptyConf,
+    new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf))
 
 /**
  * Pushes operations down into a Sample.
@@ -1399,6 +1407,16 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
   }
 }
 
+/** Replaces the expression of CurrentDatabase with the current database name. 
*/
+case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends 
Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = {
+    plan transformAllExpressions {
+      case CurrentDatabase() =>
+        Literal.create(sessionCatalog.getCurrentDatabase, StringType)
+    }
+  }
+}
+
 /**
  * Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]] 
beneath it and a
  * [[SerializeFromObject]] above it.  If these serializations can't be 
eliminated, we should embed

http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
index 6e5672d..55af6c5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.catalyst
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.analysis.EmptyFunctionRegistry
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -38,7 +40,10 @@ class OptimizerExtendableSuite extends SparkFunSuite {
    * This class represents a dummy extended optimizer that takes the batches 
of the
    * Optimizer and adds custom ones.
    */
-  class ExtendedOptimizer extends Optimizer {
+  class ExtendedOptimizer
+    extends Optimizer(
+      EmptyConf,
+      new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, 
EmptyConf)) {
 
     // rules set to DummyRule, would not be executed anyways
     val myBatches: Seq[Batch] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index cbde777..8dfbba7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -18,9 +18,14 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.ExperimentalMethods
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 
-class SparkOptimizer(experimentalMethods: ExperimentalMethods) extends 
Optimizer {
+class SparkOptimizer(
+    conf: CatalystConf,
+    sessionCatalog: SessionCatalog,
+    experimentalMethods: ExperimentalMethods) extends Optimizer(conf, 
sessionCatalog) {
   override def batches: Seq[Batch] = super.batches :+ Batch(
     "User Provided Optimizers", FixedPoint(100), 
experimentalMethods.extraOptimizations: _*)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 69e3358..10497e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -80,7 +80,7 @@ private[sql] class SessionState(ctx: SQLContext) {
   /**
    * Logical query plan optimizer.
    */
-  lazy val optimizer: Optimizer = new SparkOptimizer(experimentalMethods)
+  lazy val optimizer: Optimizer = new SparkOptimizer(conf, catalog, 
experimentalMethods)
 
   /**
    * Parser that extracts expressions, plans, table identifiers etc. from SQL 
texts.

http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index ff93bfc..7ebdad1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -61,19 +61,6 @@ import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
 /**
- * Returns the current database of metadataHive.
- */
-private[hive] case class CurrentDatabase(ctx: HiveContext)
-  extends LeafExpression with CodegenFallback {
-  override def dataType: DataType = StringType
-  override def foldable: Boolean = true
-  override def nullable: Boolean = false
-  override def eval(input: InternalRow): Any = {
-    UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase)
-  }
-}
-
-/**
  * An instance of the Spark SQL execution engine that integrates with data 
stored in Hive.
  * Configuration for Hive is read from hive-site.xml on the classpath.
  *
@@ -133,11 +120,6 @@ class HiveContext private[hive](
   @transient
   protected[sql] override lazy val sessionState = new HiveSessionState(self)
 
-  // The Hive UDF current_database() is foldable, will be evaluated by 
optimizer,
-  // but the optimizer can't access the SessionState of metadataHive.
-  sessionState.functionRegistry.registerFunction(
-    "current_database", (e: Seq[Expression]) => new CurrentDatabase(self))
-
   /**
    * When true, enables an experimental feature where metastore tables that 
use the parquet SerDe
    * are automatically converted to use the Spark SQL parquet table scan, 
instead of the Hive


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to