Repository: spark
Updated Branches:
  refs/heads/master 2f98ee67d -> 27aab8069


[SPARK-14013][SQL] Proper temp function support in catalog

## What changes were proposed in this pull request?

Session catalog was added in #11750. However, it doesn't really support 
temporary functions properly; right now we only store the metadata in the form 
of `CatalogFunction`, but this doesn't make sense for temporary functions 
because there is no class name.

This patch moves the `FunctionRegistry` into the `SessionCatalog`. With this, 
the user can call `catalog.createTempFunction` and `catalog.lookupFunction` to 
use the function they registered previously. This is currently still dead code, 
however.

## How was this patch tested?

`SessionCatalogSuite`.

Author: Andrew Or <[email protected]>

Closes #11972 from andrewor14/temp-functions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27aab806
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27aab806
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27aab806

Branch: refs/heads/master
Commit: 27aab80695cfcf0c0ecf1e98a5a862a8123213a1
Parents: 2f98ee6
Author: Andrew Or <[email protected]>
Authored: Mon Mar 28 16:45:02 2016 -0700
Committer: Andrew Or <[email protected]>
Committed: Mon Mar 28 16:45:02 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 12 ++-
 .../catalyst/analysis/FunctionRegistry.scala    | 24 ++++++
 .../sql/catalyst/catalog/SessionCatalog.scala   | 82 ++++++++++++--------
 .../sql/catalyst/analysis/AnalysisTest.scala    |  2 +-
 .../analysis/DecimalPrecisionSuite.scala        |  2 +-
 .../catalyst/catalog/SessionCatalogSuite.scala  | 57 +++++++-------
 .../optimizer/BooleanSimplificationSuite.scala  |  2 +-
 .../optimizer/EliminateSortsSuite.scala         |  2 +-
 .../spark/sql/internal/SessionState.scala       |  8 +-
 .../spark/sql/hive/HiveSessionCatalog.scala     |  4 +-
 .../spark/sql/hive/HiveSessionState.scala       | 14 ++--
 .../org/apache/spark/sql/hive/hiveUDFs.scala    | 10 +++
 12 files changed, 136 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3b83e68..8dc0532 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -42,9 +42,15 @@ import org.apache.spark.sql.types._
  * to resolve attribute references.
  */
 object SimpleAnalyzer
-  extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true))
-class SimpleAnalyzer(conf: CatalystConf)
-  extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), 
EmptyFunctionRegistry, conf)
+  extends SimpleAnalyzer(
+    EmptyFunctionRegistry,
+    new SimpleCatalystConf(caseSensitiveAnalysis = true))
+
+class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
+  extends Analyzer(
+    new SessionCatalog(new InMemoryCatalog, functionRegistry, conf),
+    functionRegistry,
+    conf)
 
 /**
  * Provides a logical query plan analyzer, which translates 
[[UnresolvedAttribute]]s and

http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index f584a4b..e9788b7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -45,6 +45,13 @@ trait FunctionRegistry {
 
   /* Get the class of the registered function by specified name. */
   def lookupFunction(name: String): Option[ExpressionInfo]
+
+  /* Get the builder of the registered function by specified name. */
+  def lookupFunctionBuilder(name: String): Option[FunctionBuilder]
+
+  /** Drop a function and return whether the function existed. */
+  def dropFunction(name: String): Boolean
+
 }
 
 class SimpleFunctionRegistry extends FunctionRegistry {
@@ -76,6 +83,14 @@ class SimpleFunctionRegistry extends FunctionRegistry {
     functionBuilders.get(name).map(_._1)
   }
 
+  override def lookupFunctionBuilder(name: String): Option[FunctionBuilder] = 
synchronized {
+    functionBuilders.get(name).map(_._2)
+  }
+
+  override def dropFunction(name: String): Boolean = synchronized {
+    functionBuilders.remove(name).isDefined
+  }
+
   def copy(): SimpleFunctionRegistry = synchronized {
     val registry = new SimpleFunctionRegistry
     functionBuilders.iterator.foreach { case (name, (info, builder)) =>
@@ -106,6 +121,15 @@ object EmptyFunctionRegistry extends FunctionRegistry {
   override def lookupFunction(name: String): Option[ExpressionInfo] = {
     throw new UnsupportedOperationException
   }
+
+  override def lookupFunctionBuilder(name: String): Option[FunctionBuilder] = {
+    throw new UnsupportedOperationException
+  }
+
+  override def dropFunction(name: String): Boolean = {
+    throw new UnsupportedOperationException
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index a9cf807..7165db1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -22,6 +22,9 @@ import scala.collection.mutable
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, 
SimpleFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 
 
@@ -32,15 +35,22 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
  *
  * This class is not thread-safe.
  */
-class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) {
+class SessionCatalog(
+    externalCatalog: ExternalCatalog,
+    functionRegistry: FunctionRegistry,
+    conf: CatalystConf) {
   import ExternalCatalog._
 
+  def this(externalCatalog: ExternalCatalog, functionRegistry: 
FunctionRegistry) {
+    this(externalCatalog, functionRegistry, new SimpleCatalystConf(true))
+  }
+
+  // For testing only.
   def this(externalCatalog: ExternalCatalog) {
-    this(externalCatalog, new SimpleCatalystConf(true))
+    this(externalCatalog, new SimpleFunctionRegistry)
   }
 
   protected[this] val tempTables = new mutable.HashMap[String, LogicalPlan]
-  protected[this] val tempFunctions = new mutable.HashMap[String, 
CatalogFunction]
 
   // Note: we track current database here because certain operations do not 
explicitly
   // specify the database (e.g. DROP TABLE my_table). In these cases we must 
first
@@ -431,6 +441,18 @@ class SessionCatalog(externalCatalog: ExternalCatalog, 
conf: CatalystConf) {
     externalCatalog.alterFunction(db, newFuncDefinition)
   }
 
+  /**
+   * Retrieve the metadata of a metastore function.
+   *
+   * If a database is specified in `name`, this will return the function in 
that database.
+   * If no database is specified, this will return the function in the current 
database.
+   */
+  def getFunction(name: FunctionIdentifier): CatalogFunction = {
+    val db = name.database.getOrElse(currentDb)
+    externalCatalog.getFunction(db, name.funcName)
+  }
+
+
   // ----------------------------------------------------------------
   // | Methods that interact with temporary and metastore functions |
   // ----------------------------------------------------------------
@@ -439,14 +461,14 @@ class SessionCatalog(externalCatalog: ExternalCatalog, 
conf: CatalystConf) {
    * Create a temporary function.
    * This assumes no database is specified in `funcDefinition`.
    */
-  def createTempFunction(funcDefinition: CatalogFunction, ignoreIfExists: 
Boolean): Unit = {
-    require(funcDefinition.identifier.database.isEmpty,
-      "attempted to create a temporary function while specifying a database")
-    val name = funcDefinition.identifier.funcName
-    if (tempFunctions.contains(name) && !ignoreIfExists) {
+  def createTempFunction(
+      name: String,
+      funcDefinition: FunctionBuilder,
+      ignoreIfExists: Boolean): Unit = {
+    if (functionRegistry.lookupFunctionBuilder(name).isDefined && 
!ignoreIfExists) {
       throw new AnalysisException(s"Temporary function '$name' already 
exists.")
     }
-    tempFunctions.put(name, funcDefinition)
+    functionRegistry.registerFunction(name, funcDefinition)
   }
 
   /**
@@ -456,11 +478,10 @@ class SessionCatalog(externalCatalog: ExternalCatalog, 
conf: CatalystConf) {
   // Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to 
consolidate
   // dropFunction and dropTempFunction.
   def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = {
-    if (!tempFunctions.contains(name) && !ignoreIfNotExists) {
+    if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) {
       throw new AnalysisException(
         s"Temporary function '$name' cannot be dropped because it does not 
exist!")
     }
-    tempFunctions.remove(name)
   }
 
   /**
@@ -477,34 +498,29 @@ class SessionCatalog(externalCatalog: ExternalCatalog, 
conf: CatalystConf) {
       throw new AnalysisException("rename does not support moving functions 
across databases")
     }
     val db = oldName.database.getOrElse(currentDb)
-    if (oldName.database.isDefined || 
!tempFunctions.contains(oldName.funcName)) {
+    val oldBuilder = functionRegistry.lookupFunctionBuilder(oldName.funcName)
+    if (oldName.database.isDefined || oldBuilder.isEmpty) {
       externalCatalog.renameFunction(db, oldName.funcName, newName.funcName)
     } else {
-      val func = tempFunctions(oldName.funcName)
-      val newFunc = func.copy(identifier = func.identifier.copy(funcName = 
newName.funcName))
-      tempFunctions.remove(oldName.funcName)
-      tempFunctions.put(newName.funcName, newFunc)
+      val oldExpressionInfo = 
functionRegistry.lookupFunction(oldName.funcName).get
+      val newExpressionInfo = new ExpressionInfo(
+        oldExpressionInfo.getClassName,
+        newName.funcName,
+        oldExpressionInfo.getUsage,
+        oldExpressionInfo.getExtended)
+      functionRegistry.dropFunction(oldName.funcName)
+      functionRegistry.registerFunction(newName.funcName, newExpressionInfo, 
oldBuilder.get)
     }
   }
 
   /**
-   * Retrieve the metadata of an existing function.
-   *
-   * If a database is specified in `name`, this will return the function in 
that database.
-   * If no database is specified, this will first attempt to return a 
temporary function with
-   * the same name, then, if that does not exist, return the function in the 
current database.
+   * Return an [[Expression]] that represents the specified function, assuming 
it exists.
+   * Note: This is currently only used for temporary functions.
    */
-  def getFunction(name: FunctionIdentifier): CatalogFunction = {
-    val db = name.database.getOrElse(currentDb)
-    if (name.database.isDefined || !tempFunctions.contains(name.funcName)) {
-      externalCatalog.getFunction(db, name.funcName)
-    } else {
-      tempFunctions(name.funcName)
-    }
+  def lookupFunction(name: String, children: Seq[Expression]): Expression = {
+    functionRegistry.lookupFunction(name, children)
   }
 
-  // TODO: implement lookupFunction that returns something from the registry 
itself
-
   /**
    * List all matching functions in the specified database, including 
temporary functions.
    */
@@ -512,7 +528,7 @@ class SessionCatalog(externalCatalog: ExternalCatalog, 
conf: CatalystConf) {
     val dbFunctions =
       externalCatalog.listFunctions(db, pattern).map { f => 
FunctionIdentifier(f, Some(db)) }
     val regex = pattern.replaceAll("\\*", ".*").r
-    val _tempFunctions = tempFunctions.keys.toSeq
+    val _tempFunctions = functionRegistry.listFunction()
       .filter { f => regex.pattern.matcher(f).matches() }
       .map { f => FunctionIdentifier(f) }
     dbFunctions ++ _tempFunctions
@@ -521,8 +537,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog, 
conf: CatalystConf) {
   /**
    * Return a temporary function. For testing only.
    */
-  private[catalog] def getTempFunction(name: String): Option[CatalogFunction] 
= {
-    tempFunctions.get(name)
+  private[catalog] def getTempFunction(name: String): Option[FunctionBuilder] 
= {
+    functionRegistry.lookupFunctionBuilder(name)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 34cb976..3ec95ef 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -30,7 +30,7 @@ trait AnalysisTest extends PlanTest {
 
   private def makeAnalyzer(caseSensitive: Boolean): Analyzer = {
     val conf = new SimpleCatalystConf(caseSensitive)
-    val catalog = new SessionCatalog(new InMemoryCatalog, conf)
+    val catalog = new SessionCatalog(new InMemoryCatalog, 
EmptyFunctionRegistry, conf)
     catalog.createTempTable("TaBlE", TestRelations.testRelation, 
overrideIfExists = true)
     new Analyzer(catalog, EmptyFunctionRegistry, conf) {
       override val extendedResolutionRules = EliminateSubqueryAliases :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index 6c08ccc..1a350bf 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types._
 
 class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
   private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
-  private val catalog = new SessionCatalog(new InMemoryCatalog, conf)
+  private val catalog = new SessionCatalog(new InMemoryCatalog, 
EmptyFunctionRegistry, conf)
   private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
 
   private val relation = LocalRelation(

http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 2948c5f..acd9759 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
 
 
@@ -682,20 +683,20 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("create temp function") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val tempFunc1 = newFunc("temp1")
-    val tempFunc2 = newFunc("temp2")
-    catalog.createTempFunction(tempFunc1, ignoreIfExists = false)
-    catalog.createTempFunction(tempFunc2, ignoreIfExists = false)
+    val tempFunc1 = (e: Seq[Expression]) => e.head
+    val tempFunc2 = (e: Seq[Expression]) => e.last
+    catalog.createTempFunction("temp1", tempFunc1, ignoreIfExists = false)
+    catalog.createTempFunction("temp2", tempFunc2, ignoreIfExists = false)
     assert(catalog.getTempFunction("temp1") == Some(tempFunc1))
     assert(catalog.getTempFunction("temp2") == Some(tempFunc2))
     assert(catalog.getTempFunction("temp3") == None)
+    val tempFunc3 = (e: Seq[Expression]) => Literal(e.size)
     // Temporary function already exists
     intercept[AnalysisException] {
-      catalog.createTempFunction(tempFunc1, ignoreIfExists = false)
+      catalog.createTempFunction("temp1", tempFunc3, ignoreIfExists = false)
     }
     // Temporary function is overridden
-    val tempFunc3 = tempFunc1.copy(className = "something else")
-    catalog.createTempFunction(tempFunc3, ignoreIfExists = true)
+    catalog.createTempFunction("temp1", tempFunc3, ignoreIfExists = true)
     assert(catalog.getTempFunction("temp1") == Some(tempFunc3))
   }
 
@@ -725,8 +726,8 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("drop temp function") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val tempFunc = newFunc("func1")
-    catalog.createTempFunction(tempFunc, ignoreIfExists = false)
+    val tempFunc = (e: Seq[Expression]) => e.head
+    catalog.createTempFunction("func1", tempFunc, ignoreIfExists = false)
     assert(catalog.getTempFunction("func1") == Some(tempFunc))
     catalog.dropTempFunction("func1", ignoreIfNotExists = false)
     assert(catalog.getTempFunction("func1") == None)
@@ -755,20 +756,15 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
   }
 
-  test("get temp function") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    val metastoreFunc = externalCatalog.getFunction("db2", "func1")
-    val tempFunc = newFunc("func1").copy(className = "something weird")
-    sessionCatalog.createTempFunction(tempFunc, ignoreIfExists = false)
-    sessionCatalog.setCurrentDatabase("db2")
-    // If a database is specified, we'll always return the function in that 
database
-    assert(sessionCatalog.getFunction(FunctionIdentifier("func1", 
Some("db2"))) == metastoreFunc)
-    // If no database is specified, we'll first return temporary functions
-    assert(sessionCatalog.getFunction(FunctionIdentifier("func1")) == tempFunc)
-    // Then, if no such temporary function exist, check the current database
-    sessionCatalog.dropTempFunction("func1", ignoreIfNotExists = false)
-    assert(sessionCatalog.getFunction(FunctionIdentifier("func1")) == 
metastoreFunc)
+  test("lookup temp function") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val tempFunc1 = (e: Seq[Expression]) => e.head
+    catalog.createTempFunction("func1", tempFunc1, ignoreIfExists = false)
+    assert(catalog.lookupFunction("func1", Seq(Literal(1), Literal(2), 
Literal(3))) == Literal(1))
+    catalog.dropTempFunction("func1", ignoreIfNotExists = false)
+    intercept[AnalysisException] {
+      catalog.lookupFunction("func1", Seq(Literal(1), Literal(2), Literal(3)))
+    }
   }
 
   test("rename function") {
@@ -813,8 +809,8 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("rename temp function") {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tempFunc = newFunc("func1").copy(className = "something weird")
-    sessionCatalog.createTempFunction(tempFunc, ignoreIfExists = false)
+    val tempFunc = (e: Seq[Expression]) => e.head
+    sessionCatalog.createTempFunction("func1", tempFunc, ignoreIfExists = 
false)
     sessionCatalog.setCurrentDatabase("db2")
     // If a database is specified, we'll always rename the function in that 
database
     sessionCatalog.renameFunction(
@@ -825,8 +821,7 @@ class SessionCatalogSuite extends SparkFunSuite {
     // If no database is specified, we'll first rename temporary functions
     sessionCatalog.createFunction(newFunc("func1", Some("db2")))
     sessionCatalog.renameFunction(FunctionIdentifier("func1"), 
FunctionIdentifier("func4"))
-    assert(sessionCatalog.getTempFunction("func4") ==
-      Some(tempFunc.copy(identifier = FunctionIdentifier("func4"))))
+    assert(sessionCatalog.getTempFunction("func4") == Some(tempFunc))
     assert(sessionCatalog.getTempFunction("func1") == None)
     assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1", 
"func3"))
     // Then, if no such temporary function exist, rename the function in the 
current database
@@ -858,12 +853,12 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("list functions") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val tempFunc1 = newFunc("func1").copy(className = "march")
-    val tempFunc2 = newFunc("yes_me").copy(className = "april")
+    val tempFunc1 = (e: Seq[Expression]) => e.head
+    val tempFunc2 = (e: Seq[Expression]) => e.last
     catalog.createFunction(newFunc("func2", Some("db2")))
     catalog.createFunction(newFunc("not_me", Some("db2")))
-    catalog.createTempFunction(tempFunc1, ignoreIfExists = false)
-    catalog.createTempFunction(tempFunc2, ignoreIfExists = false)
+    catalog.createTempFunction("func1", tempFunc1, ignoreIfExists = false)
+    catalog.createTempFunction("yes_me", tempFunc2, ignoreIfExists = false)
     assert(catalog.listFunctions("db1", "*").toSet ==
       Set(FunctionIdentifier("func1"),
         FunctionIdentifier("yes_me")))

http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index e2c76b7..dd6b5ca 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -140,7 +140,7 @@ class BooleanSimplificationSuite extends PlanTest with 
PredicateHelper {
 
   private val caseInsensitiveConf = new SimpleCatalystConf(false)
   private val caseInsensitiveAnalyzer = new Analyzer(
-    new SessionCatalog(new InMemoryCatalog, caseInsensitiveConf),
+    new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, 
caseInsensitiveConf),
     EmptyFunctionRegistry,
     caseInsensitiveConf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 3824c67..009889d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules._
 
 class EliminateSortsSuite extends PlanTest {
   val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, 
orderByOrdinal = false)
-  val catalog = new SessionCatalog(new InMemoryCatalog, conf)
+  val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, 
conf)
   val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
 
   object Optimize extends RuleExecutor[LogicalPlan] {

http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 9bc6407..f7fdfac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -44,14 +44,14 @@ private[sql] class SessionState(ctx: SQLContext) {
   lazy val experimentalMethods = new ExperimentalMethods
 
   /**
-   * Internal catalog for managing table and database states.
+   * Internal catalog for managing functions registered by the user.
    */
-  lazy val catalog = new SessionCatalog(ctx.externalCatalog, conf)
+  lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
 
   /**
-   * Internal catalog for managing functions registered by the user.
+   * Internal catalog for managing table and database states.
    */
-  lazy val functionRegistry: FunctionRegistry = FunctionRegistry.builtin.copy()
+  lazy val catalog = new SessionCatalog(ctx.externalCatalog, functionRegistry, 
conf)
 
   /**
    * Interface exposed to the user for registering user-defined functions.

http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index ec7bf61..ff12245 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -31,8 +32,9 @@ class HiveSessionCatalog(
     externalCatalog: HiveExternalCatalog,
     client: HiveClient,
     context: HiveContext,
+    functionRegistry: FunctionRegistry,
     conf: SQLConf)
-  extends SessionCatalog(externalCatalog, conf) {
+  extends SessionCatalog(externalCatalog, functionRegistry, conf) {
 
   override def setCurrentDatabase(db: String): Unit = {
     super.setCurrentDatabase(db)

http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index caa7f29..c9b6b1d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -35,13 +35,6 @@ private[hive] class HiveSessionState(ctx: HiveContext) 
extends SessionState(ctx)
   }
 
   /**
-   * Internal catalog for managing table and database states.
-   */
-  override lazy val catalog = {
-    new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, conf)
-  }
-
-  /**
    * Internal catalog for managing functions registered by the user.
    * Note that HiveUDFs will be overridden by functions registered in this 
context.
    */
@@ -50,6 +43,13 @@ private[hive] class HiveSessionState(ctx: HiveContext) 
extends SessionState(ctx)
   }
 
   /**
+   * Internal catalog for managing table and database states.
+   */
+  override lazy val catalog = {
+    new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, 
functionRegistry, conf)
+  }
+
+  /**
    * An analyzer that uses the Hive metastore.
    */
   override lazy val analyzer: Analyzer = {

http://git-wip-us.apache.org/repos/asf/spark/blob/27aab806/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index efaa052..c07c428 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -141,6 +141,16 @@ private[hive] class HiveFunctionRegistry(
       }
     }.getOrElse(None))
   }
+
+  override def lookupFunctionBuilder(name: String): Option[FunctionBuilder] = {
+    underlying.lookupFunctionBuilder(name)
+  }
+
+  // Note: This does not drop functions stored in the metastore
+  override def dropFunction(name: String): Boolean = {
+    underlying.dropFunction(name)
+  }
+
 }
 
 private[hive] case class HiveSimpleUDF(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to