This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new f1c69a5a687f [SPARK-49791][SQL] Make DelegatingCatalogExtension more
extendable
f1c69a5a687f is described below
commit f1c69a5a687fdb4e5a613fe43bbf6f6366f63fda
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu Sep 26 13:39:02 2024 -0700
[SPARK-49791][SQL] Make DelegatingCatalogExtension more extendable
### What changes were proposed in this pull request?
This PR updates `DelegatingCatalogExtension` so that it's more extendable
- `initialize` becomes not final, so that sub-classes can overwrite it
- `delegate` becomes `protected`, so that sub-classes can access it
In addition, this PR fixes a mistake that `DelegatingCatalogExtension` is
just a convenient default implementation, it's actually the `CatalogExtension`
interface that indicates this catalog implementation will delegate requests to
the Spark session catalog. https://github.com/apache/spark/pull/47724 should
use `CatalogExtension` instead.
### Why are the changes needed?
Unblock the Iceberg extension.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #48257 from cloud-fan/catalog.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 339dd5b93316fecd0455b53b2cedee2b5333a184)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/connector/catalog/DelegatingCatalogExtension.java | 4 ++--
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +-
.../apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 4 ++--
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
index f6686d2e4d3b..786821514822 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
@@ -38,7 +38,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@Evolving
public abstract class DelegatingCatalogExtension implements CatalogExtension {
- private CatalogPlugin delegate;
+ protected CatalogPlugin delegate;
@Override
public final void setDelegateCatalog(CatalogPlugin delegate) {
@@ -51,7 +51,7 @@ public abstract class DelegatingCatalogExtension implements
CatalogExtension {
}
@Override
- public final void initialize(String name, CaseInsensitiveStringMap options)
{}
+ public void initialize(String name, CaseInsensitiveStringMap options) {}
@Override
public Set<TableCatalogCapability> capabilities() {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 2506cb736f18..f1664f66b7f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -568,7 +568,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T])
{
val canUseV2 = lookupV2Provider().isDefined ||
(df.sparkSession.sessionState.conf.getConf(
SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined &&
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
- .isInstanceOf[DelegatingCatalogExtension])
+ .isInstanceOf[CatalogExtension])
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 7500f32ac2b9..0a86a043985e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL,
ResolveDefaultColumns => DefaultCols}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin,
CatalogV2Util, DelegatingCatalogExtension, LookupCatalog, SupportsNamespaces,
V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogExtension,
CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog,
SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.command._
@@ -691,6 +691,6 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
isSessionCatalog(catalog) && (
SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty ||
- catalog.isInstanceOf[DelegatingCatalogExtension])
+ catalog.isInstanceOf[CatalogExtension])
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]