This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 82a84ede6a47 [SPARK-46937][SQL] Revert "[] Improve concurrency
performance for FunctionRegistry"
82a84ede6a47 is described below
commit 82a84ede6a47232fe3af86672ceea97f703b3e8a
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Jun 11 12:55:16 2024 -0700
[SPARK-46937][SQL] Revert "[] Improve concurrency performance for
FunctionRegistry"
### What changes were proposed in this pull request?
Reverts https://github.com/apache/spark/pull/44976 as it breaks
thread-safety
### Why are the changes needed?
Fix thread-safety
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
N/A
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #46940 from cloud-fan/revert.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/catalyst/analysis/FunctionRegistry.scala | 54 +++++++++++-----------
1 file changed, 28 insertions(+), 26 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 588752f3fc17..3a418497fa53 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -18,9 +18,9 @@
package org.apache.spark.sql.catalyst.analysis
import java.util.Locale
-import java.util.concurrent.ConcurrentHashMap
+import javax.annotation.concurrent.GuardedBy
-import scala.jdk.CollectionConverters._
+import scala.collection.mutable
import scala.reflect.ClassTag
import org.apache.spark.SparkUnsupportedOperationException
@@ -195,8 +195,9 @@ object FunctionRegistryBase {
trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with
Logging {
+ @GuardedBy("this")
protected val functionBuilders =
- new ConcurrentHashMap[FunctionIdentifier, (ExpressionInfo,
FunctionBuilder)]
+ new mutable.HashMap[FunctionIdentifier, (ExpressionInfo, FunctionBuilder)]
// Resolution of the function name is always case insensitive, but the
database name
// depends on the caller
@@ -219,10 +220,10 @@ trait SimpleFunctionRegistryBase[T] extends
FunctionRegistryBase[T] with Logging
def internalRegisterFunction(
name: FunctionIdentifier,
info: ExpressionInfo,
- builder: FunctionBuilder): Unit = {
+ builder: FunctionBuilder): Unit = synchronized {
val newFunction = (info, builder)
functionBuilders.put(name, newFunction) match {
- case previousFunction if previousFunction != null =>
+ case Some(previousFunction) if previousFunction != newFunction =>
logWarning(log"The function ${MDC(FUNCTION_NAME, name)} replaced a " +
log"previously registered function.")
case _ =>
@@ -230,25 +231,34 @@ trait SimpleFunctionRegistryBase[T] extends
FunctionRegistryBase[T] with Logging
}
override def lookupFunction(name: FunctionIdentifier, children:
Seq[Expression]): T = {
- val func =
Option(functionBuilders.get(normalizeFuncName(name))).map(_._2).getOrElse {
- throw QueryCompilationErrors.unresolvedRoutineError(name,
Seq("system.builtin"))
+ val func = synchronized {
+ functionBuilders.get(normalizeFuncName(name)).map(_._2).getOrElse {
+ throw QueryCompilationErrors.unresolvedRoutineError(name,
Seq("system.builtin"))
+ }
}
func(children)
}
- override def listFunction(): Seq[FunctionIdentifier] =
- functionBuilders.keys().asScala.toSeq
+ override def listFunction(): Seq[FunctionIdentifier] = synchronized {
+ functionBuilders.iterator.map(_._1).toList
+ }
- override def lookupFunction(name: FunctionIdentifier):
Option[ExpressionInfo] =
- Option(functionBuilders.get(normalizeFuncName(name))).map(_._1)
+ override def lookupFunction(name: FunctionIdentifier):
Option[ExpressionInfo] = synchronized {
+ functionBuilders.get(normalizeFuncName(name)).map(_._1)
+ }
- override def lookupFunctionBuilder(name: FunctionIdentifier):
Option[FunctionBuilder] =
- Option(functionBuilders.get(normalizeFuncName(name))).map(_._2)
+ override def lookupFunctionBuilder(
+ name: FunctionIdentifier): Option[FunctionBuilder] = synchronized {
+ functionBuilders.get(normalizeFuncName(name)).map(_._2)
+ }
- override def dropFunction(name: FunctionIdentifier): Boolean =
- Option(functionBuilders.remove(normalizeFuncName(name))).isDefined
+ override def dropFunction(name: FunctionIdentifier): Boolean = synchronized {
+ functionBuilders.remove(normalizeFuncName(name)).isDefined
+ }
- override def clear(): Unit = functionBuilders.clear()
+ override def clear(): Unit = synchronized {
+ functionBuilders.clear()
+ }
}
/**
@@ -298,11 +308,7 @@ class SimpleFunctionRegistry
override def clone(): SimpleFunctionRegistry = synchronized {
val registry = new SimpleFunctionRegistry
- val iterator = functionBuilders.entrySet().iterator()
- while (iterator.hasNext) {
- val entry = iterator.next()
- val name = entry.getKey
- val (info, builder) = entry.getValue
+ functionBuilders.iterator.foreach { case (name, (info, builder)) =>
registry.internalRegisterFunction(name, info, builder)
}
registry
@@ -1030,11 +1036,7 @@ class SimpleTableFunctionRegistry extends
SimpleFunctionRegistryBase[LogicalPlan
override def clone(): SimpleTableFunctionRegistry = synchronized {
val registry = new SimpleTableFunctionRegistry
- val iterator = functionBuilders.entrySet().iterator()
- while (iterator.hasNext) {
- val entry = iterator.next()
- val name = entry.getKey
- val (info, builder) = entry.getValue
+ functionBuilders.iterator.foreach { case (name, (info, builder)) =>
registry.internalRegisterFunction(name, info, builder)
}
registry
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]