This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 82a84ede6a47 [SPARK-46937][SQL] Revert "[] Improve concurrency 
performance for FunctionRegistry"
82a84ede6a47 is described below

commit 82a84ede6a47232fe3af86672ceea97f703b3e8a
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Jun 11 12:55:16 2024 -0700

    [SPARK-46937][SQL] Revert "[] Improve concurrency performance for 
FunctionRegistry"
    
    ### What changes were proposed in this pull request?
    
    Reverts https://github.com/apache/spark/pull/44976 as it breaks 
thread-safety
    
    ### Why are the changes needed?
    
    Fix thread-safety
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    N/A
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #46940 from cloud-fan/revert.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/catalyst/analysis/FunctionRegistry.scala   | 54 +++++++++++-----------
 1 file changed, 28 insertions(+), 26 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 588752f3fc17..3a418497fa53 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import java.util.Locale
-import java.util.concurrent.ConcurrentHashMap
+import javax.annotation.concurrent.GuardedBy
 
-import scala.jdk.CollectionConverters._
+import scala.collection.mutable
 import scala.reflect.ClassTag
 
 import org.apache.spark.SparkUnsupportedOperationException
@@ -195,8 +195,9 @@ object FunctionRegistryBase {
 
 trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with 
Logging {
 
+  @GuardedBy("this")
   protected val functionBuilders =
-    new ConcurrentHashMap[FunctionIdentifier, (ExpressionInfo, 
FunctionBuilder)]
+    new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]
 
   // Resolution of the function name is always case insensitive, but the 
database name
   // depends on the caller
@@ -219,10 +220,10 @@ trait SimpleFunctionRegistryBase[T] extends 
FunctionRegistryBase[T] with Logging
   def internalRegisterFunction(
       name: FunctionIdentifier,
       info: ExpressionInfo,
-      builder: FunctionBuilder): Unit = {
+      builder: FunctionBuilder): Unit = synchronized {
     val newFunction = (info, builder)
     functionBuilders.put(name, newFunction) match {
-      case previousFunction if previousFunction != null =>
+      case Some(previousFunction) if previousFunction != newFunction =>
         logWarning(log"The function ${MDC(FUNCTION_NAME, name)} replaced a " +
           log"previously registered function.")
       case _ =>
@@ -230,25 +231,34 @@ trait SimpleFunctionRegistryBase[T] extends 
FunctionRegistryBase[T] with Logging
   }
 
   override def lookupFunction(name: FunctionIdentifier, children: 
Seq[Expression]): T = {
-    val func = 
Option(functionBuilders.get(normalizeFuncName(name))).map(_._2).getOrElse {
-      throw QueryCompilationErrors.unresolvedRoutineError(name, 
Seq("system.builtin"))
+    val func = synchronized {
+      functionBuilders.get(normalizeFuncName(name)).map(_._2).getOrElse {
+        throw QueryCompilationErrors.unresolvedRoutineError(name, 
Seq("system.builtin"))
+      }
     }
     func(children)
   }
 
-  override def listFunction(): Seq[FunctionIdentifier] =
-    functionBuilders.keys().asScala.toSeq
+  override def listFunction(): Seq[FunctionIdentifier] = synchronized {
+    functionBuilders.iterator.map(_._1).toList
+  }
 
-  override def lookupFunction(name: FunctionIdentifier): 
Option[ExpressionInfo] =
-    Option(functionBuilders.get(normalizeFuncName(name))).map(_._1)
+  override def lookupFunction(name: FunctionIdentifier): 
Option[ExpressionInfo] = synchronized {
+    functionBuilders.get(normalizeFuncName(name)).map(_._1)
+  }
 
-  override def lookupFunctionBuilder(name: FunctionIdentifier): 
Option[FunctionBuilder] =
-    Option(functionBuilders.get(normalizeFuncName(name))).map(_._2)
+  override def lookupFunctionBuilder(
+      name: FunctionIdentifier): Option[FunctionBuilder] = synchronized {
+    functionBuilders.get(normalizeFuncName(name)).map(_._2)
+  }
 
-  override def dropFunction(name: FunctionIdentifier): Boolean =
-    Option(functionBuilders.remove(normalizeFuncName(name))).isDefined
+  override def dropFunction(name: FunctionIdentifier): Boolean = synchronized {
+    functionBuilders.remove(normalizeFuncName(name)).isDefined
+  }
 
-  override def clear(): Unit = functionBuilders.clear()
+  override def clear(): Unit = synchronized {
+    functionBuilders.clear()
+  }
 }
 
 /**
@@ -298,11 +308,7 @@ class SimpleFunctionRegistry
 
   override def clone(): SimpleFunctionRegistry = synchronized {
     val registry = new SimpleFunctionRegistry
-    val iterator = functionBuilders.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val name = entry.getKey
-      val (info, builder) = entry.getValue
+    functionBuilders.iterator.foreach { case (name, (info, builder)) =>
       registry.internalRegisterFunction(name, info, builder)
     }
     registry
@@ -1030,11 +1036,7 @@ class SimpleTableFunctionRegistry extends 
SimpleFunctionRegistryBase[LogicalPlan
 
   override def clone(): SimpleTableFunctionRegistry = synchronized {
     val registry = new SimpleTableFunctionRegistry
-    val iterator = functionBuilders.entrySet().iterator()
-    while (iterator.hasNext) {
-      val entry = iterator.next()
-      val name = entry.getKey
-      val (info, builder) = entry.getValue
+    functionBuilders.iterator.foreach { case (name, (info, builder)) =>
       registry.internalRegisterFunction(name, info, builder)
     }
     registry


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to