This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 339dd5b93316 [SPARK-49791][SQL] Make DelegatingCatalogExtension more
extendable
339dd5b93316 is described below
commit 339dd5b93316fecd0455b53b2cedee2b5333a184
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu Sep 26 13:39:02 2024 -0700
[SPARK-49791][SQL] Make DelegatingCatalogExtension more extendable
### What changes were proposed in this pull request?
This PR updates `DelegatingCatalogExtension` so that it's more extendable
- `initialize` becomes not final, so that sub-classes can overwrite it
- `delegate` becomes `protected`, so that sub-classes can access it
In addition, this PR fixes a mistake that `DelegatingCatalogExtension` is
just a convenient default implementation, it's actually the `CatalogExtension`
interface that indicates this catalog implementation will delegate requests to
the Spark session catalog. https://github.com/apache/spark/pull/47724 should
use `CatalogExtension` instead.
### Why are the changes needed?
Unblock the Iceberg extension.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #48257 from cloud-fan/catalog.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/connector/catalog/DelegatingCatalogExtension.java | 4 ++--
.../apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala | 4 ++--
.../scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala | 2 +-
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
index f6686d2e4d3b..786821514822 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
@@ -38,7 +38,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@Evolving
public abstract class DelegatingCatalogExtension implements CatalogExtension {
- private CatalogPlugin delegate;
+ protected CatalogPlugin delegate;
@Override
public final void setDelegateCatalog(CatalogPlugin delegate) {
@@ -51,7 +51,7 @@ public abstract class DelegatingCatalogExtension implements
CatalogExtension {
}
@Override
- public final void initialize(String name, CaseInsensitiveStringMap options)
{}
+ public void initialize(String name, CaseInsensitiveStringMap options) {}
@Override
public Set<TableCatalogCapability> capabilities() {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 02ad2e79a564..a9ad7523c8fb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL,
ResolveDefaultColumns => DefaultCols}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin,
CatalogV2Util, DelegatingCatalogExtension, LookupCatalog, SupportsNamespaces,
V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogExtension,
CatalogManager, CatalogPlugin, CatalogV2Util, LookupCatalog,
SupportsNamespaces, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command._
@@ -706,6 +706,6 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
private def supportsV1Command(catalog: CatalogPlugin): Boolean = {
isSessionCatalog(catalog) && (
SQLConf.get.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isEmpty ||
- catalog.isInstanceOf[DelegatingCatalogExtension])
+ catalog.isInstanceOf[CatalogExtension])
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
index f0eef9ae1cbb..8164d33f46fe 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
@@ -429,7 +429,7 @@ final class DataFrameWriterImpl[T] private[sql](ds:
Dataset[T]) extends DataFram
val canUseV2 = lookupV2Provider().isDefined ||
(df.sparkSession.sessionState.conf.getConf(
SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined &&
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
- .isInstanceOf[DelegatingCatalogExtension])
+ .isInstanceOf[CatalogExtension])
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]