This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 212a59d7a5be [SPARK-49393][SQL] Fail by default in deprecated catalog 
plugin APIs
212a59d7a5be is described below

commit 212a59d7a5be1e2ecda41a6b46771b7b2ee1bedc
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Aug 27 15:53:20 2024 -0700

    [SPARK-49393][SQL] Fail by default in deprecated catalog plugin APIs
    
    We should not force users to implement these deprecated catalog plugin 
APIs, such as `TableCatalog#createTable` that tasks `StructType`. This PR adds 
default implementation for them which simply fails and asks to implement one of 
the methods.
    
    According to 
https://docs.oracle.com/javase/specs/jls/se8/html/jls-13.html#jls-13.5.6, 
adding default implementation to an existing interface method is binary 
compatible.
    
    to simply catalog implementations.
    
    no
    
    N/A
    
    No
    
    Closes #47874 from cloud-fan/minor.
    
    Lead-authored-by: Wenchen Fan <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../sql/connector/catalog/StagingTableCatalog.java | 21 +++++++++++++------
 .../spark/sql/connector/catalog/TableCatalog.java  |  6 ++++--
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  7 +------
 .../spark/sql/errors/QueryCompilationErrors.scala  | 10 ++++++++-
 .../connector/catalog/InMemoryTableCatalog.scala   | 10 ---------
 .../datasources/v2/jdbc/JDBCTableCatalog.scala     |  9 --------
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 24 ----------------------
 7 files changed, 29 insertions(+), 58 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
index 50643b9cbe32..6f074faf6e58 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
@@ -27,6 +27,7 @@ import 
org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
 import org.apache.spark.sql.connector.write.BatchWrite;
 import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.errors.QueryCompilationErrors;
 import org.apache.spark.sql.types.StructType;
 
 /**
@@ -61,11 +62,13 @@ public interface StagingTableCatalog extends TableCatalog {
    * {@link #stageCreate(Identifier, Column[], Transform[], Map)} instead.
    */
   @Deprecated(since = "3.4.0")
-  StagedTable stageCreate(
+  default StagedTable stageCreate(
       Identifier ident,
       StructType schema,
       Transform[] partitions,
-      Map<String, String> properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException;
+      Map<String, String> properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException {
+    throw QueryCompilationErrors.mustOverrideOneMethodError("stageCreate");
+  }
 
   /**
    * Stage the creation of a table, preparing it to be committed into the 
metastore.
@@ -101,11 +104,14 @@ public interface StagingTableCatalog extends TableCatalog 
{
    * This is deprecated, please override
    * {@link #stageReplace(Identifier, StructType, Transform[], Map)} instead.
    */
-  StagedTable stageReplace(
+  @Deprecated(since = "3.4.0")
+  default StagedTable stageReplace(
       Identifier ident,
       StructType schema,
       Transform[] partitions,
-      Map<String, String> properties) throws NoSuchNamespaceException, 
NoSuchTableException;
+      Map<String, String> properties) throws NoSuchNamespaceException, 
NoSuchTableException {
+    throw QueryCompilationErrors.mustOverrideOneMethodError("stageReplace");
+  }
 
   /**
    * Stage the replacement of a table, preparing it to be committed into the 
metastore when the
@@ -151,11 +157,14 @@ public interface StagingTableCatalog extends TableCatalog 
{
    * This is deprecated, please override
    * {@link #stageCreateOrReplace(Identifier, Column[], Transform[], Map)} 
instead.
    */
-  StagedTable stageCreateOrReplace(
+  @Deprecated(since = "3.4.0")
+  default StagedTable stageCreateOrReplace(
       Identifier ident,
       StructType schema,
       Transform[] partitions,
-      Map<String, String> properties) throws NoSuchNamespaceException;
+      Map<String, String> properties) throws NoSuchNamespaceException {
+    throw 
QueryCompilationErrors.mustOverrideOneMethodError("stageCreateOrReplace");
+  }
 
   /**
    * Stage the creation or replacement of a table, preparing it to be 
committed into the metastore
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index ad4fe743218f..ba3470f85338 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -195,11 +195,13 @@ public interface TableCatalog extends CatalogPlugin {
    * {@link #createTable(Identifier, Column[], Transform[], Map)} instead.
    */
   @Deprecated(since = "3.4.0")
-  Table createTable(
+  default Table createTable(
       Identifier ident,
       StructType schema,
       Transform[] partitions,
-      Map<String, String> properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException;
+      Map<String, String> properties) throws TableAlreadyExistsException, 
NoSuchNamespaceException {
+    throw QueryCompilationErrors.mustOverrideOneMethodError("createTable");
+  }
 
   /**
    * Create a table in the catalog.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 92ab804f0a70..0164af945ca2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.connector.catalog.{View => _, _}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 import org.apache.spark.sql.connector.catalog.TableChange.{After, 
ColumnPosition}
 import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction => 
V2AggregateFunction, ScalarFunction, UnboundFunction}
-import org.apache.spark.sql.connector.expressions.{FieldReference, 
IdentityTransform, Transform}
+import org.apache.spark.sql.connector.expressions.{FieldReference, 
IdentityTransform}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
@@ -87,11 +87,6 @@ object FakeV2SessionCatalog extends TableCatalog with 
FunctionCatalog with Suppo
   override def loadTable(ident: Identifier): Table = {
     throw new NoSuchTableException(ident.asMultipartIdentifier)
   }
-  override def createTable(
-      ident: Identifier,
-      schema: StructType,
-      partitions: Array[Transform],
-      properties: util.Map[String, String]): Table = fail()
   override def alterTable(ident: Identifier, changes: TableChange*): Table = 
fail()
   override def dropTable(ident: Identifier): Boolean = fail()
   override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = 
fail()
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index dae358e4ef0b..159154b014c1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -21,7 +21,7 @@ import java.util.Locale
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, 
SparkUnsupportedOperationException}
+import org.apache.spark.{SPARK_DOC_ROOT, SparkException, 
SparkRuntimeException, SparkThrowable, SparkUnsupportedOperationException}
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, 
FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier}
 import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
FunctionAlreadyExistsException, NamespaceAlreadyExistsException, 
NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, 
NoSuchTableException, Star, TableAlreadyExistsException, UnresolvedRegex}
@@ -4091,6 +4091,14 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       "createTable(..., Array[Column], ...)")
   }
 
+  def mustOverrideOneMethodError(methodName: String): RuntimeException = {
+    val msg = s"You must override one `$methodName`. It's preferred to not 
override the " +
+      "deprecated one."
+    new SparkRuntimeException(
+      "INTERNAL_ERROR",
+      Map("message" -> msg))
+  }
+
   def cannotAssignEventTimeColumn(): Throwable = {
     new AnalysisException(
       errorClass = "CANNOT_ASSIGN_EVENT_TIME_COLUMN_WITHOUT_WATERMARK",
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 0515237adfae..982de88e5884 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -25,8 +25,6 @@ import scala.jdk.CollectionConverters._
 import 
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, 
NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, 
TableAlreadyExistsException}
 import org.apache.spark.sql.connector.distributions.{Distribution, 
Distributions}
 import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class BasicInMemoryTableCatalog extends TableCatalog {
@@ -85,14 +83,6 @@ class BasicInMemoryTableCatalog extends TableCatalog {
     invalidatedTables.add(ident)
   }
 
-  override def createTable(
-      ident: Identifier,
-      schema: StructType,
-      partitions: Array[Transform],
-      properties: util.Map[String, String]): Table = {
-    throw QueryCompilationErrors.createTableDeprecatedError()
-  }
-
   override def createTable(
       ident: Identifier,
       columns: Array[Column],
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
index e7a3fe0f8aa7..3871bdf50177 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.errors.{DataTypeErrorsBase, 
QueryCompilationErrors,
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, 
JdbcOptionsInWrite, JDBCRDD, JdbcUtils}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class JDBCTableCatalog extends TableCatalog
@@ -144,14 +143,6 @@ class JDBCTableCatalog extends TableCatalog
     }
   }
 
-  override def createTable(
-      ident: Identifier,
-      schema: StructType,
-      partitions: Array[Transform],
-      properties: java.util.Map[String, String]): Table = {
-    throw QueryCompilationErrors.createTableDeprecatedError()
-  }
-
   override def createTable(
       ident: Identifier,
       columns: Array[Column],
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index de398c49a0eb..a61a266c1ed5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -3870,14 +3870,6 @@ class ReadOnlyCatalog extends InMemoryCatalog {
 }
 
 class FakeStagedTableCatalog extends InMemoryCatalog with StagingTableCatalog {
-  override def stageCreate(
-      ident: Identifier,
-      schema: StructType,
-      partitions: Array[Transform],
-      properties: util.Map[String, String]): StagedTable = {
-    throw new RuntimeException("shouldn't be called")
-  }
-
   override def stageCreate(
       ident: Identifier,
       columns: Array[ColumnV2],
@@ -3887,14 +3879,6 @@ class FakeStagedTableCatalog extends InMemoryCatalog 
with StagingTableCatalog {
     null
   }
 
-  override def stageReplace(
-      ident: Identifier,
-      schema: StructType,
-      partitions: Array[Transform],
-      properties: util.Map[String, String]): StagedTable = {
-    throw new RuntimeException("shouldn't be called")
-  }
-
   override def stageReplace(
       ident: Identifier,
       columns: Array[ColumnV2],
@@ -3905,14 +3889,6 @@ class FakeStagedTableCatalog extends InMemoryCatalog 
with StagingTableCatalog {
     null
   }
 
-  override def stageCreateOrReplace(
-      ident: Identifier,
-      schema: StructType,
-      partitions: Array[Transform],
-      properties: util.Map[String, String]): StagedTable = {
-    throw new RuntimeException("shouldn't be called")
-  }
-
   override def stageCreateOrReplace(
       ident: Identifier,
       columns: Array[ColumnV2],


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to