Repository: spark
Updated Branches:
refs/heads/master 4df65184b -> b2dfa8495
[SPARK-14668][SQL] Move CurrentDatabase to Catalyst
## What changes were proposed in this pull request?
This PR moves `CurrentDatabase` from sql/hive package to sql/catalyst. It also
adds the function description, which looks like the following.
```
scala> sqlContext.sql("describe function extended
current_database").collect.foreach(println)
[Function: current_database]
[Class: org.apache.spark.sql.execution.command.CurrentDatabase]
[Usage: current_database() - Returns the current database.]
[Extended Usage:
> SELECT current_database()]
```
## How was this patch tested?
Existing tests
Author: Yin Huai <[email protected]>
Closes #12424 from yhuai/SPARK-14668.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2dfa849
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2dfa849
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2dfa849
Branch: refs/heads/master
Commit: b2dfa849599843269a43e6e0f2ab8c539dfc32b6
Parents: 4df6518
Author: Yin Huai <[email protected]>
Authored: Fri Apr 15 17:48:41 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Fri Apr 15 17:48:41 2016 -0700
----------------------------------------------------------------------
.../catalyst/analysis/FunctionRegistry.scala | 1 +
.../spark/sql/catalyst/expressions/misc.scala | 12 ++++++++++
.../sql/catalyst/optimizer/Optimizer.scala | 24 +++++++++++++++++---
.../optimizer/OptimizerExtendableSuite.scala | 7 +++++-
.../spark/sql/execution/SparkOptimizer.scala | 7 +++++-
.../spark/sql/internal/SessionState.scala | 2 +-
.../org/apache/spark/sql/hive/HiveContext.scala | 18 ---------------
7 files changed, 47 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index f2abf13..028463e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -337,6 +337,7 @@ object FunctionRegistry {
expression[SparkPartitionID]("spark_partition_id"),
expression[InputFileName]("input_file_name"),
expression[MonotonicallyIncreasingID]("monotonically_increasing_id"),
+ expression[CurrentDatabase]("current_database"),
// grouping sets
expression[Cube]("cube"),
http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index 4bd918e..113fc86 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -512,3 +512,15 @@ object XxHash64Function extends InterpretedHashFunction {
XXH64.hashUnsafeBytes(base, offset, len, seed)
}
}
+
+/**
+ * Returns the current database of the SessionCatalog.
+ */
+@ExpressionDescription(
+ usage = "_FUNC_() - Returns the current database.",
+ extended = "> SELECT _FUNC_()")
+private[sql] case class CurrentDatabase() extends LeafExpression with
Unevaluable {
+ override def dataType: DataType = StringType
+ override def foldable: Boolean = true
+ override def nullable: Boolean = false
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f5172b2..bb68ef8 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -20,7 +20,9 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import scala.collection.immutable.HashSet
-import org.apache.spark.sql.catalyst.analysis.{CleanupAliases,
DistinctAggregationRewriter, EliminateSubqueryAliases}
+import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf}
+import org.apache.spark.sql.catalyst.analysis.{CleanupAliases,
DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
@@ -34,7 +36,9 @@ import org.apache.spark.sql.types._
* Abstract class all optimizers should inherit of, contains the standard
batches (extending
* Optimizers can override this.
*/
-abstract class Optimizer extends RuleExecutor[LogicalPlan] {
+abstract class Optimizer(
+ conf: CatalystConf,
+ sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] {
def batches: Seq[Batch] = {
// Technically some of the rules in Finish Analysis are not optimizer
rules and belong more
// in the analyzer, because they are needed for correctness (e.g.
ComputeCurrentTime).
@@ -43,6 +47,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
Batch("Finish Analysis", Once,
EliminateSubqueryAliases,
ComputeCurrentTime,
+ GetCurrentDatabase(sessionCatalog),
DistinctAggregationRewriter) ::
//////////////////////////////////////////////////////////////////////////////////////////
// Optimizer rules start here
@@ -117,7 +122,10 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan]
{
* To ensure extendability, we leave the standard rules in the abstract
optimizer rules, while
* specific rules go to the subclasses
*/
-object DefaultOptimizer extends Optimizer
+object DefaultOptimizer
+ extends Optimizer(
+ EmptyConf,
+ new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf))
/**
* Pushes operations down into a Sample.
@@ -1399,6 +1407,16 @@ object ComputeCurrentTime extends Rule[LogicalPlan] {
}
}
+/** Replaces the expression of CurrentDatabase with the current database name.
*/
+case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends
Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ plan transformAllExpressions {
+ case CurrentDatabase() =>
+ Literal.create(sessionCatalog.getCurrentDatabase, StringType)
+ }
+ }
+}
+
/**
* Typed [[Filter]] is by default surrounded by a [[DeserializeToObject]]
beneath it and a
* [[SerializeFromObject]] above it. If these serializations can't be
eliminated, we should embed
http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
index 6e5672d..55af6c5 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
@@ -18,6 +18,8 @@
package org.apache.spark.sql.catalyst
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.analysis.EmptyFunctionRegistry
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
@@ -38,7 +40,10 @@ class OptimizerExtendableSuite extends SparkFunSuite {
* This class represents a dummy extended optimizer that takes the batches
of the
* Optimizer and adds custom ones.
*/
- class ExtendedOptimizer extends Optimizer {
+ class ExtendedOptimizer
+ extends Optimizer(
+ EmptyConf,
+ new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry,
EmptyConf)) {
// rules set to DummyRule, would not be executed anyways
val myBatches: Seq[Batch] = {
http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index cbde777..8dfbba7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -18,9 +18,14 @@
package org.apache.spark.sql.execution
import org.apache.spark.sql.ExperimentalMethods
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
-class SparkOptimizer(experimentalMethods: ExperimentalMethods) extends
Optimizer {
+class SparkOptimizer(
+ conf: CatalystConf,
+ sessionCatalog: SessionCatalog,
+ experimentalMethods: ExperimentalMethods) extends Optimizer(conf,
sessionCatalog) {
override def batches: Seq[Batch] = super.batches :+ Batch(
"User Provided Optimizers", FixedPoint(100),
experimentalMethods.extraOptimizations: _*)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 69e3358..10497e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -80,7 +80,7 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* Logical query plan optimizer.
*/
- lazy val optimizer: Optimizer = new SparkOptimizer(experimentalMethods)
+ lazy val optimizer: Optimizer = new SparkOptimizer(conf, catalog,
experimentalMethods)
/**
* Parser that extracts expressions, plans, table identifiers etc. from SQL
texts.
http://git-wip-us.apache.org/repos/asf/spark/blob/b2dfa849/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index ff93bfc..7ebdad1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -61,19 +61,6 @@ import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
/**
- * Returns the current database of metadataHive.
- */
-private[hive] case class CurrentDatabase(ctx: HiveContext)
- extends LeafExpression with CodegenFallback {
- override def dataType: DataType = StringType
- override def foldable: Boolean = true
- override def nullable: Boolean = false
- override def eval(input: InternalRow): Any = {
- UTF8String.fromString(ctx.sessionState.catalog.getCurrentDatabase)
- }
-}
-
-/**
* An instance of the Spark SQL execution engine that integrates with data
stored in Hive.
* Configuration for Hive is read from hive-site.xml on the classpath.
*
@@ -133,11 +120,6 @@ class HiveContext private[hive](
@transient
protected[sql] override lazy val sessionState = new HiveSessionState(self)
- // The Hive UDF current_database() is foldable, will be evaluated by
optimizer,
- // but the optimizer can't access the SessionState of metadataHive.
- sessionState.functionRegistry.registerFunction(
- "current_database", (e: Seq[Expression]) => new CurrentDatabase(self))
-
/**
* When true, enables an experimental feature where metastore tables that
use the parquet SerDe
* are automatically converted to use the Spark SQL parquet table scan,
instead of the Hive
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]