This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 293b2a3d51da [SPARK-46670][PYTHON][SQL][FOLLOW-UP] Do not pass static 
Python Data Sources around when cloning DataSourceManager
293b2a3d51da is described below

commit 293b2a3d51da6c0877a8cfd55ea76854fcde6c01
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Tue Jan 16 11:50:21 2024 +0900

    [SPARK-46670][PYTHON][SQL][FOLLOW-UP] Do not pass static Python Data 
Sources around when cloning DataSourceManager
    
    ### What changes were proposed in this pull request?
    
    This PR is a followup of https://github.com/apache/spark/pull/44681 that 
proposes to remove the logic of passing static Python Data Sources around when 
cloning `DataSourceManager`. They are static Data Sources so we don't actually 
have to pass around.
    
    ### Why are the changes needed?
    
    For better readability.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, dev-only.
    
    ### How was this patch tested?
    
    Existing test cases should cover.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #44743 from HyukjinKwon/SPARK-46670-followup.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../execution/datasources/DataSourceManager.scala  | 25 ++++++++++------------
 1 file changed, 11 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
index 28c93357d8b4..8ee2325ca1f9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
@@ -32,18 +32,15 @@ import org.apache.spark.util.Utils
  * A manager for user-defined data sources. It is used to register and lookup 
data sources by
  * their short names or fully qualified names.
  */
-class DataSourceManager(
-    initDataSourceBuilders: => Option[
-      Map[String, UserDefinedPythonDataSource]] = None
-   ) extends Logging {
+class DataSourceManager extends Logging {
   import DataSourceManager._
+
   // Lazy to avoid being invoked during Session initialization.
   // Otherwise, it goes infinite loop, session -> Python runner -> SQLConf -> 
session.
-  private lazy val staticDataSourceBuilders = initDataSourceBuilders.getOrElse 
{
-    initialDataSourceBuilders
-  }
+  private lazy val staticDataSourceBuilders = initialStaticDataSourceBuilders
 
-  private val dataSourceBuilders = new ConcurrentHashMap[String, 
UserDefinedPythonDataSource]()
+  private val runtimeDataSourceBuilders =
+    new ConcurrentHashMap[String, UserDefinedPythonDataSource]()
 
   /**
    * Register a data source builder for the given provider.
@@ -55,7 +52,7 @@ class DataSourceManager(
       // Cannot overwrite static Python Data Sources.
       throw QueryCompilationErrors.dataSourceAlreadyExists(name)
     }
-    val previousValue = dataSourceBuilders.put(normalizedName, source)
+    val previousValue = runtimeDataSourceBuilders.put(normalizedName, source)
     if (previousValue != null) {
       logWarning(f"The data source $name replaced a previously registered data 
source.")
     }
@@ -69,7 +66,7 @@ class DataSourceManager(
     if (dataSourceExists(name)) {
       val normalizedName = normalize(name)
       staticDataSourceBuilders.getOrElse(
-        normalizedName, dataSourceBuilders.get(normalizedName))
+        normalizedName, runtimeDataSourceBuilders.get(normalizedName))
     } else {
       throw QueryCompilationErrors.dataSourceDoesNotExist(name)
     }
@@ -81,12 +78,12 @@ class DataSourceManager(
   def dataSourceExists(name: String): Boolean = {
     val normalizedName = normalize(name)
     staticDataSourceBuilders.contains(normalizedName) ||
-      dataSourceBuilders.containsKey(normalizedName)
+      runtimeDataSourceBuilders.containsKey(normalizedName)
   }
 
   override def clone(): DataSourceManager = {
-    val manager = new DataSourceManager(Some(staticDataSourceBuilders))
-    dataSourceBuilders.forEach((k, v) => manager.registerDataSource(k, v))
+    val manager = new DataSourceManager
+    runtimeDataSourceBuilders.forEach((k, v) => manager.registerDataSource(k, 
v))
     manager
   }
 }
@@ -103,7 +100,7 @@ object DataSourceManager extends Logging {
 
   private def normalize(name: String): String = name.toLowerCase(Locale.ROOT)
 
-  private def initialDataSourceBuilders: Map[String, 
UserDefinedPythonDataSource] = {
+  private def initialStaticDataSourceBuilders: Map[String, 
UserDefinedPythonDataSource] = {
     if (Utils.isTesting || shouldLoadPythonDataSources) this.synchronized {
       if (dataSourceBuilders.isEmpty) {
         val maybeResult = try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to