This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 293b2a3d51da [SPARK-46670][PYTHON][SQL][FOLLOW-UP] Do not pass static Python Data Sources around when cloning DataSourceManager 293b2a3d51da is described below commit 293b2a3d51da6c0877a8cfd55ea76854fcde6c01 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Tue Jan 16 11:50:21 2024 +0900 [SPARK-46670][PYTHON][SQL][FOLLOW-UP] Do not pass static Python Data Sources around when cloning DataSourceManager ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/44681 that proposes to remove the logic of passing static Python Data Sources around when cloning `DataSourceManager`. They are static Data Sources so we don't actually have to pass around. ### Why are the changes needed? For better readability. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Existing test cases should cover. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44743 from HyukjinKwon/SPARK-46670-followup. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../execution/datasources/DataSourceManager.scala | 25 ++++++++++------------ 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala index 28c93357d8b4..8ee2325ca1f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala @@ -32,18 +32,15 @@ import org.apache.spark.util.Utils * A manager for user-defined data sources. It is used to register and lookup data sources by * their short names or fully qualified names. */ -class DataSourceManager( - initDataSourceBuilders: => Option[ - Map[String, UserDefinedPythonDataSource]] = None - ) extends Logging { +class DataSourceManager extends Logging { import DataSourceManager._ + // Lazy to avoid being invoked during Session initialization. // Otherwise, it goes infinite loop, session -> Python runner -> SQLConf -> session. - private lazy val staticDataSourceBuilders = initDataSourceBuilders.getOrElse { - initialDataSourceBuilders - } + private lazy val staticDataSourceBuilders = initialStaticDataSourceBuilders - private val dataSourceBuilders = new ConcurrentHashMap[String, UserDefinedPythonDataSource]() + private val runtimeDataSourceBuilders = + new ConcurrentHashMap[String, UserDefinedPythonDataSource]() /** * Register a data source builder for the given provider. @@ -55,7 +52,7 @@ class DataSourceManager( // Cannot overwrite static Python Data Sources. throw QueryCompilationErrors.dataSourceAlreadyExists(name) } - val previousValue = dataSourceBuilders.put(normalizedName, source) + val previousValue = runtimeDataSourceBuilders.put(normalizedName, source) if (previousValue != null) { logWarning(f"The data source $name replaced a previously registered data source.") } @@ -69,7 +66,7 @@ class DataSourceManager( if (dataSourceExists(name)) { val normalizedName = normalize(name) staticDataSourceBuilders.getOrElse( - normalizedName, dataSourceBuilders.get(normalizedName)) + normalizedName, runtimeDataSourceBuilders.get(normalizedName)) } else { throw QueryCompilationErrors.dataSourceDoesNotExist(name) } @@ -81,12 +78,12 @@ class DataSourceManager( def dataSourceExists(name: String): Boolean = { val normalizedName = normalize(name) staticDataSourceBuilders.contains(normalizedName) || - dataSourceBuilders.containsKey(normalizedName) + runtimeDataSourceBuilders.containsKey(normalizedName) } override def clone(): DataSourceManager = { - val manager = new DataSourceManager(Some(staticDataSourceBuilders)) - dataSourceBuilders.forEach((k, v) => manager.registerDataSource(k, v)) + val manager = new DataSourceManager + runtimeDataSourceBuilders.forEach((k, v) => manager.registerDataSource(k, v)) manager } } @@ -103,7 +100,7 @@ object DataSourceManager extends Logging { private def normalize(name: String): String = name.toLowerCase(Locale.ROOT) - private def initialDataSourceBuilders: Map[String, UserDefinedPythonDataSource] = { + private def initialStaticDataSourceBuilders: Map[String, UserDefinedPythonDataSource] = { if (Utils.isTesting || shouldLoadPythonDataSources) this.synchronized { if (dataSourceBuilders.isEmpty) { val maybeResult = try { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org