This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 212a59d7a5be [SPARK-49393][SQL] Fail by default in deprecated catalog
plugin APIs
212a59d7a5be is described below
commit 212a59d7a5be1e2ecda41a6b46771b7b2ee1bedc
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Aug 27 15:53:20 2024 -0700
[SPARK-49393][SQL] Fail by default in deprecated catalog plugin APIs
We should not force users to implement these deprecated catalog plugin
APIs, such as `TableCatalog#createTable` that tasks `StructType`. This PR adds
default implementation for them which simply fails and asks to implement one of
the methods.
According to
https://docs.oracle.com/javase/specs/jls/se8/html/jls-13.html#jls-13.5.6,
adding default implementation to an existing interface method is binary
compatible.
to simply catalog implementations.
no
N/A
No
Closes #47874 from cloud-fan/minor.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../sql/connector/catalog/StagingTableCatalog.java | 21 +++++++++++++------
.../spark/sql/connector/catalog/TableCatalog.java | 6 ++++--
.../spark/sql/catalyst/analysis/Analyzer.scala | 7 +------
.../spark/sql/errors/QueryCompilationErrors.scala | 10 ++++++++-
.../connector/catalog/InMemoryTableCatalog.scala | 10 ---------
.../datasources/v2/jdbc/JDBCTableCatalog.scala | 9 --------
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 24 ----------------------
7 files changed, 29 insertions(+), 58 deletions(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
index 50643b9cbe32..6f074faf6e58 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/StagingTableCatalog.java
@@ -27,6 +27,7 @@ import
org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.errors.QueryCompilationErrors;
import org.apache.spark.sql.types.StructType;
/**
@@ -61,11 +62,13 @@ public interface StagingTableCatalog extends TableCatalog {
* {@link #stageCreate(Identifier, Column[], Transform[], Map)} instead.
*/
@Deprecated(since = "3.4.0")
- StagedTable stageCreate(
+ default StagedTable stageCreate(
Identifier ident,
StructType schema,
Transform[] partitions,
- Map<String, String> properties) throws TableAlreadyExistsException,
NoSuchNamespaceException;
+ Map<String, String> properties) throws TableAlreadyExistsException,
NoSuchNamespaceException {
+ throw QueryCompilationErrors.mustOverrideOneMethodError("stageCreate");
+ }
/**
* Stage the creation of a table, preparing it to be committed into the
metastore.
@@ -101,11 +104,14 @@ public interface StagingTableCatalog extends TableCatalog
{
* This is deprecated, please override
* {@link #stageReplace(Identifier, StructType, Transform[], Map)} instead.
*/
- StagedTable stageReplace(
+ @Deprecated(since = "3.4.0")
+ default StagedTable stageReplace(
Identifier ident,
StructType schema,
Transform[] partitions,
- Map<String, String> properties) throws NoSuchNamespaceException,
NoSuchTableException;
+ Map<String, String> properties) throws NoSuchNamespaceException,
NoSuchTableException {
+ throw QueryCompilationErrors.mustOverrideOneMethodError("stageReplace");
+ }
/**
* Stage the replacement of a table, preparing it to be committed into the
metastore when the
@@ -151,11 +157,14 @@ public interface StagingTableCatalog extends TableCatalog
{
* This is deprecated, please override
* {@link #stageCreateOrReplace(Identifier, Column[], Transform[], Map)}
instead.
*/
- StagedTable stageCreateOrReplace(
+ @Deprecated(since = "3.4.0")
+ default StagedTable stageCreateOrReplace(
Identifier ident,
StructType schema,
Transform[] partitions,
- Map<String, String> properties) throws NoSuchNamespaceException;
+ Map<String, String> properties) throws NoSuchNamespaceException {
+ throw
QueryCompilationErrors.mustOverrideOneMethodError("stageCreateOrReplace");
+ }
/**
* Stage the creation or replacement of a table, preparing it to be
committed into the metastore
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index ad4fe743218f..ba3470f85338 100644
---
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -195,11 +195,13 @@ public interface TableCatalog extends CatalogPlugin {
* {@link #createTable(Identifier, Column[], Transform[], Map)} instead.
*/
@Deprecated(since = "3.4.0")
- Table createTable(
+ default Table createTable(
Identifier ident,
StructType schema,
Transform[] partitions,
- Map<String, String> properties) throws TableAlreadyExistsException,
NoSuchNamespaceException;
+ Map<String, String> properties) throws TableAlreadyExistsException,
NoSuchNamespaceException {
+ throw QueryCompilationErrors.mustOverrideOneMethodError("createTable");
+ }
/**
* Create a table in the catalog.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 92ab804f0a70..0164af945ca2 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -50,7 +50,7 @@ import org.apache.spark.sql.connector.catalog.{View => _, _}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.{After,
ColumnPosition}
import org.apache.spark.sql.connector.catalog.functions.{AggregateFunction =>
V2AggregateFunction, ScalarFunction, UnboundFunction}
-import org.apache.spark.sql.connector.expressions.{FieldReference,
IdentityTransform, Transform}
+import org.apache.spark.sql.connector.expressions.{FieldReference,
IdentityTransform}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
@@ -87,11 +87,6 @@ object FakeV2SessionCatalog extends TableCatalog with
FunctionCatalog with Suppo
override def loadTable(ident: Identifier): Table = {
throw new NoSuchTableException(ident.asMultipartIdentifier)
}
- override def createTable(
- ident: Identifier,
- schema: StructType,
- partitions: Array[Transform],
- properties: util.Map[String, String]): Table = fail()
override def alterTable(ident: Identifier, changes: TableChange*): Table =
fail()
override def dropTable(ident: Identifier): Boolean = fail()
override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
fail()
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index dae358e4ef0b..159154b014c1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -21,7 +21,7 @@ import java.util.Locale
import org.apache.hadoop.fs.Path
-import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable,
SparkUnsupportedOperationException}
+import org.apache.spark.{SPARK_DOC_ROOT, SparkException,
SparkRuntimeException, SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{ExtendedAnalysisException,
FunctionIdentifier, InternalRow, QualifiedTableName, TableIdentifier}
import
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException,
FunctionAlreadyExistsException, NamespaceAlreadyExistsException,
NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException,
NoSuchTableException, Star, TableAlreadyExistsException, UnresolvedRegex}
@@ -4091,6 +4091,14 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
"createTable(..., Array[Column], ...)")
}
+ def mustOverrideOneMethodError(methodName: String): RuntimeException = {
+ val msg = s"You must override one `$methodName`. It's preferred to not
override the " +
+ "deprecated one."
+ new SparkRuntimeException(
+ "INTERNAL_ERROR",
+ Map("message" -> msg))
+ }
+
def cannotAssignEventTimeColumn(): Throwable = {
new AnalysisException(
errorClass = "CANNOT_ASSIGN_EVENT_TIME_COLUMN_WITHOUT_WATERMARK",
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 0515237adfae..982de88e5884 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -25,8 +25,6 @@ import scala.jdk.CollectionConverters._
import
org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException,
NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException,
TableAlreadyExistsException}
import org.apache.spark.sql.connector.distributions.{Distribution,
Distributions}
import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class BasicInMemoryTableCatalog extends TableCatalog {
@@ -85,14 +83,6 @@ class BasicInMemoryTableCatalog extends TableCatalog {
invalidatedTables.add(ident)
}
- override def createTable(
- ident: Identifier,
- schema: StructType,
- partitions: Array[Transform],
- properties: util.Map[String, String]): Table = {
- throw QueryCompilationErrors.createTableDeprecatedError()
- }
-
override def createTable(
ident: Identifier,
columns: Array[Column],
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
index e7a3fe0f8aa7..3871bdf50177 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.errors.{DataTypeErrorsBase,
QueryCompilationErrors,
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions,
JdbcOptionsInWrite, JDBCRDD, JdbcUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
-import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
class JDBCTableCatalog extends TableCatalog
@@ -144,14 +143,6 @@ class JDBCTableCatalog extends TableCatalog
}
}
- override def createTable(
- ident: Identifier,
- schema: StructType,
- partitions: Array[Transform],
- properties: java.util.Map[String, String]): Table = {
- throw QueryCompilationErrors.createTableDeprecatedError()
- }
-
override def createTable(
ident: Identifier,
columns: Array[Column],
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index de398c49a0eb..a61a266c1ed5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -3870,14 +3870,6 @@ class ReadOnlyCatalog extends InMemoryCatalog {
}
class FakeStagedTableCatalog extends InMemoryCatalog with StagingTableCatalog {
- override def stageCreate(
- ident: Identifier,
- schema: StructType,
- partitions: Array[Transform],
- properties: util.Map[String, String]): StagedTable = {
- throw new RuntimeException("shouldn't be called")
- }
-
override def stageCreate(
ident: Identifier,
columns: Array[ColumnV2],
@@ -3887,14 +3879,6 @@ class FakeStagedTableCatalog extends InMemoryCatalog
with StagingTableCatalog {
null
}
- override def stageReplace(
- ident: Identifier,
- schema: StructType,
- partitions: Array[Transform],
- properties: util.Map[String, String]): StagedTable = {
- throw new RuntimeException("shouldn't be called")
- }
-
override def stageReplace(
ident: Identifier,
columns: Array[ColumnV2],
@@ -3905,14 +3889,6 @@ class FakeStagedTableCatalog extends InMemoryCatalog
with StagingTableCatalog {
null
}
- override def stageCreateOrReplace(
- ident: Identifier,
- schema: StructType,
- partitions: Array[Transform],
- properties: util.Map[String, String]): StagedTable = {
- throw new RuntimeException("shouldn't be called")
- }
-
override def stageCreateOrReplace(
ident: Identifier,
columns: Array[ColumnV2],
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]