This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 293b2a3d51da [SPARK-46670][PYTHON][SQL][FOLLOW-UP] Do not pass static
Python Data Sources around when cloning DataSourceManager
293b2a3d51da is described below
commit 293b2a3d51da6c0877a8cfd55ea76854fcde6c01
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Tue Jan 16 11:50:21 2024 +0900
[SPARK-46670][PYTHON][SQL][FOLLOW-UP] Do not pass static Python Data
Sources around when cloning DataSourceManager
### What changes were proposed in this pull request?
This PR is a followup of https://github.com/apache/spark/pull/44681 that
proposes to remove the logic of passing static Python Data Sources around when
cloning `DataSourceManager`. They are static Data Sources so we don't actually
have to pass around.
### Why are the changes needed?
For better readability.
### Does this PR introduce _any_ user-facing change?
No, dev-only.
### How was this patch tested?
Existing test cases should cover.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #44743 from HyukjinKwon/SPARK-46670-followup.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../execution/datasources/DataSourceManager.scala | 25 ++++++++++------------
1 file changed, 11 insertions(+), 14 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
index 28c93357d8b4..8ee2325ca1f9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceManager.scala
@@ -32,18 +32,15 @@ import org.apache.spark.util.Utils
* A manager for user-defined data sources. It is used to register and lookup
data sources by
* their short names or fully qualified names.
*/
-class DataSourceManager(
- initDataSourceBuilders: => Option[
- Map[String, UserDefinedPythonDataSource]] = None
- ) extends Logging {
+class DataSourceManager extends Logging {
import DataSourceManager._
+
// Lazy to avoid being invoked during Session initialization.
// Otherwise, it goes infinite loop, session -> Python runner -> SQLConf ->
session.
- private lazy val staticDataSourceBuilders = initDataSourceBuilders.getOrElse
{
- initialDataSourceBuilders
- }
+ private lazy val staticDataSourceBuilders = initialStaticDataSourceBuilders
- private val dataSourceBuilders = new ConcurrentHashMap[String,
UserDefinedPythonDataSource]()
+ private val runtimeDataSourceBuilders =
+ new ConcurrentHashMap[String, UserDefinedPythonDataSource]()
/**
* Register a data source builder for the given provider.
@@ -55,7 +52,7 @@ class DataSourceManager(
// Cannot overwrite static Python Data Sources.
throw QueryCompilationErrors.dataSourceAlreadyExists(name)
}
- val previousValue = dataSourceBuilders.put(normalizedName, source)
+ val previousValue = runtimeDataSourceBuilders.put(normalizedName, source)
if (previousValue != null) {
logWarning(f"The data source $name replaced a previously registered data
source.")
}
@@ -69,7 +66,7 @@ class DataSourceManager(
if (dataSourceExists(name)) {
val normalizedName = normalize(name)
staticDataSourceBuilders.getOrElse(
- normalizedName, dataSourceBuilders.get(normalizedName))
+ normalizedName, runtimeDataSourceBuilders.get(normalizedName))
} else {
throw QueryCompilationErrors.dataSourceDoesNotExist(name)
}
@@ -81,12 +78,12 @@ class DataSourceManager(
def dataSourceExists(name: String): Boolean = {
val normalizedName = normalize(name)
staticDataSourceBuilders.contains(normalizedName) ||
- dataSourceBuilders.containsKey(normalizedName)
+ runtimeDataSourceBuilders.containsKey(normalizedName)
}
override def clone(): DataSourceManager = {
- val manager = new DataSourceManager(Some(staticDataSourceBuilders))
- dataSourceBuilders.forEach((k, v) => manager.registerDataSource(k, v))
+ val manager = new DataSourceManager
+ runtimeDataSourceBuilders.forEach((k, v) => manager.registerDataSource(k,
v))
manager
}
}
@@ -103,7 +100,7 @@ object DataSourceManager extends Logging {
private def normalize(name: String): String = name.toLowerCase(Locale.ROOT)
- private def initialDataSourceBuilders: Map[String,
UserDefinedPythonDataSource] = {
+ private def initialStaticDataSourceBuilders: Map[String,
UserDefinedPythonDataSource] = {
if (Utils.isTesting || shouldLoadPythonDataSources) this.synchronized {
if (dataSourceBuilders.isEmpty) {
val maybeResult = try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]