This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0f9c1a2e848 [SPARK-39844][SQL] Update SQLConf for DEFAULT column
providers to allow/deny "ALTER TABLE ... ADD COLUMN" commands separately
0f9c1a2e848 is described below
commit 0f9c1a2e848fbdfb17af0555d0c8be7d5a7191bb
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Wed Jul 27 22:35:33 2022 -0700
[SPARK-39844][SQL] Update SQLConf for DEFAULT column providers to
allow/deny "ALTER TABLE ... ADD COLUMN" commands separately
### What changes were proposed in this pull request?
Update `SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS` to allow/deny `ALTER
TABLE ... ADD COLUMN` commands separately.
### Why are the changes needed?
When `ALTER TABLE ... ADD COLUMNS` commands assign column DEFAULT values,
Spark constant-folds these values and stores the result in the `EXISTS_DEFAULT`
column metadata. This allows the target data source to substitute this value
for rows where the corresponding field is not present in storage. This
responsibility is up to each data source.
In order to grant flexibility to certain data sources that are not yet
ready to support this functionality, in this PR we update the
`SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS` to support an optional asterisk
after each table provider. When the asterisk is present, `ALTER TABLE ... ADD
COLUMN` commands may not include DEFAULT values; when absent, they may include
them.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Existing column DEFAULT test coverage + new unit test coverage.
Closes #37256 from dtenedor/default-cannot-add-column.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../catalyst/util/ResolveDefaultColumnsUtil.scala | 20 +++++++++--
.../sql/connector/catalog/CatalogV2Util.scala | 13 +++++---
.../spark/sql/errors/QueryCompilationErrors.scala | 15 +++++++--
.../org/apache/spark/sql/internal/SQLConf.scala | 7 +++-
.../sql/catalyst/catalog/SessionCatalogSuite.scala | 2 +-
.../spark/sql/execution/command/tables.scala | 2 +-
.../execution/datasources/DataSourceStrategy.scala | 2 +-
.../datasources/v2/DataSourceV2Strategy.scala | 4 +--
.../org/apache/spark/sql/sources/InsertSuite.scala | 39 ++++++++++++++++++++++
9 files changed, 87 insertions(+), 17 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 4865fe378cc..0e7ff50f194 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -90,22 +90,36 @@ object ResolveDefaultColumns {
* @param tableSchema represents the names and types of the columns of the
statement to process.
* @param tableProvider provider of the target table to store default values
for, if any.
* @param statementType name of the statement being processed, such as
INSERT; useful for errors.
+ * @param addNewColumnToExistingTable true if the statement being processed
adds a new column to
+ * a table that already exists.
* @return a copy of `tableSchema` with field metadata updated with the
constant-folded values.
*/
def constantFoldCurrentDefaultsToExistDefaults(
tableSchema: StructType,
tableProvider: Option[String],
- statementType: String): StructType = {
+ statementType: String,
+ addNewColumnToExistingTable: Boolean): StructType = {
if (SQLConf.get.enableDefaultColumns) {
- val allowedTableProviders: Array[String] =
+ val keywords: Array[String] =
SQLConf.get.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)
.toLowerCase().split(",").map(_.trim)
+ val allowedTableProviders: Array[String] =
+ keywords.map(_.stripSuffix("*"))
+ val addColumnExistingTableBannedProviders: Array[String] =
+ keywords.filter(_.endsWith("*")).map(_.stripSuffix("*"))
val givenTableProvider: String =
tableProvider.getOrElse("").toLowerCase()
val newFields: Seq[StructField] = tableSchema.fields.map { field =>
if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
// Make sure that the target table has a provider that supports
default column values.
if (!allowedTableProviders.contains(givenTableProvider)) {
- throw
QueryCompilationErrors.defaultReferencesNotAllowedInDataSource(givenTableProvider)
+ throw QueryCompilationErrors
+ .defaultReferencesNotAllowedInDataSource(statementType,
givenTableProvider)
+ }
+ if (addNewColumnToExistingTable &&
+ givenTableProvider.nonEmpty &&
+
addColumnExistingTableBannedProviders.contains(givenTableProvider)) {
+ throw QueryCompilationErrors
+ .addNewDefaultColumnToExistingTableNotAllowed(statementType,
givenTableProvider)
}
val analyzed: Expression = analyze(field, statementType)
val newMetadata: Metadata = new
MetadataBuilder().withMetadata(field.metadata)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index a5087d39e98..abd43065048 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -146,7 +146,7 @@ private[sql] object CatalogV2Util {
Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
val fieldWithComment: StructField =
Option(add.comment).map(fieldWithDefault.withComment).getOrElse(fieldWithDefault)
- addField(schema, fieldWithComment, add.position(),
tableProvider, statementType)
+ addField(schema, fieldWithComment, add.position(),
tableProvider, statementType, true)
case names =>
replace(schema, names.init, parent => parent.dataType match {
case parentType: StructType =>
@@ -158,7 +158,7 @@ private[sql] object CatalogV2Util {
.getOrElse(fieldWithDefault)
Some(parent.copy(dataType =
addField(parentType, fieldWithComment, add.position(),
tableProvider,
- statementType)))
+ statementType, true)))
case _ =>
throw new IllegalArgumentException(s"Not a struct:
${names.init.last}")
})
@@ -188,7 +188,8 @@ private[sql] object CatalogV2Util {
throw new IllegalArgumentException("Field not found: " + name)
}
val withFieldRemoved = StructType(struct.fields.filter(_ !=
oldField))
- addField(withFieldRemoved, oldField, update.position(),
tableProvider, statementType)
+ addField(withFieldRemoved, oldField, update.position(),
tableProvider, statementType,
+ false)
}
update.fieldNames() match {
@@ -230,7 +231,8 @@ private[sql] object CatalogV2Util {
field: StructField,
position: ColumnPosition,
tableProvider: Option[String],
- statementType: String): StructType = {
+ statementType: String,
+ addNewColumnToExistingTable: Boolean): StructType = {
val newSchema: StructType = if (position == null) {
schema.add(field)
} else if (position.isInstanceOf[First]) {
@@ -244,7 +246,8 @@ private[sql] object CatalogV2Util {
val (before, after) = schema.fields.splitAt(fieldIndex + 1)
StructType(before ++ (field +: after))
}
- constantFoldCurrentDefaultsToExistDefaults(newSchema, tableProvider,
statementType)
+ constantFoldCurrentDefaultsToExistDefaults(
+ newSchema, tableProvider, statementType, addNewColumnToExistingTable)
}
private def replace(
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index c344c64997f..430fceb76b5 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2515,10 +2515,19 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase {
"literal value")
}
- def defaultReferencesNotAllowedInDataSource(dataSource: String): Throwable =
{
+ def defaultReferencesNotAllowedInDataSource(
+ statementType: String, dataSource: String): Throwable = {
new AnalysisException(
- s"Failed to execute command because DEFAULT values are not supported for
target data " +
- "source with table provider: \"" + dataSource + "\"")
+ s"Failed to execute $statementType command because DEFAULT values are
not supported for " +
+ "target data source with table provider: \"" + dataSource + "\"")
+ }
+
+ def addNewDefaultColumnToExistingTableNotAllowed(
+ statementType: String, dataSource: String): Throwable = {
+ new AnalysisException(
+ s"Failed to execute $statementType command because DEFAULT values are
not supported when " +
+ "adding new columns to previously existing target data source with
table " +
+ "provider: \"" + dataSource + "\"")
}
def defaultValuesMayNotContainSubQueryExpressions(): Throwable = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 631c89d798f..f78fa8c9ef2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2914,7 +2914,12 @@ object SQLConf {
buildConf("spark.sql.defaultColumn.allowedProviders")
.internal()
.doc("List of table providers wherein SQL commands are permitted to
assign DEFAULT column " +
- "values. Comma-separated list, whitespace ignored, case-insensitive.")
+ "values. Comma-separated list, whitespace ignored, case-insensitive.
If an asterisk " +
+ "appears after any table provider in this list, any command may assign
DEFAULT column " +
+ "except `ALTER TABLE ... ADD COLUMN`. Otherwise, if no asterisk
appears, all commands " +
+ "are permitted. This is useful because in order for such `ALTER TABLE
... ADD COLUMN` " +
+ "commands to work, the target data source must include support for
substituting in the " +
+ "provided values when the corresponding fields are not present in
storage.")
.version("3.4.0")
.stringConf
.createWithDefault("csv,json,orc,parquet")
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 3a248f66c1f..e9b0608a08a 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -174,7 +174,7 @@ abstract class SessionCatalogSuite extends AnalysisTest
with Eventually {
// disabled.
withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
val result: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- db1tbl3.schema, db1tbl3.provider, "CREATE TABLE")
+ db1tbl3.schema, db1tbl3.provider, "CREATE TABLE", false)
val columnEWithFeatureDisabled: StructField = findField("e", result)
// No constant-folding has taken place to the EXISTS_DEFAULT metadata.
assert(!columnEWithFeatureDisabled.metadata.contains("EXISTS_DEFAULT"))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index d8c5c04082b..a96857ab102 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -290,7 +290,7 @@ case class AlterTableAddColumnsCommand(
colsToAdd.map { col: StructField =>
if
(col.metadata.contains(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY))
{
val foldedStructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- StructType(Seq(col)), tableProvider, "ALTER TABLE ADD COLUMNS")
+ StructType(Seq(col)), tableProvider, "ALTER TABLE ADD COLUMNS", true)
foldedStructType.fields(0)
} else {
col
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 7f30300a39c..4c2c8a8ac88 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -149,7 +149,7 @@ case class DataSourceAnalysis(analyzer: Analyzer) extends
Rule[LogicalPlan] {
case CreateTable(tableDesc, mode, None) if
DDLUtils.isDatasourceTable(tableDesc) =>
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- tableDesc.schema, tableDesc.provider, "CREATE TABLE")
+ tableDesc.schema, tableDesc.provider, "CREATE TABLE", false)
val newTableDesc = tableDesc.copy(schema = newSchema)
CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode ==
SaveMode.Ignore)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 16c6b331d10..e057b43f799 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -173,7 +173,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
tableSpec, ifNotExists) =>
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- schema, tableSpec.provider, "CREATE TABLE")
+ schema, tableSpec.provider, "CREATE TABLE", false)
CreateTableExec(catalog.asTableCatalog, ident, newSchema,
partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
@@ -195,7 +195,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
case ReplaceTable(ResolvedIdentifier(catalog, ident), schema, parts,
tableSpec, orCreate) =>
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
- schema, tableSpec.provider, "CREATE TABLE")
+ schema, tableSpec.provider, "CREATE TABLE", false)
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableExec(staging, ident, newSchema, parts,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 7497aa66fa6..7edd61828d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1840,6 +1840,45 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
}
}
+ test("SPARK-39844 Restrict adding DEFAULT columns for existing tables to
certain sources") {
+ Seq("csv", "json", "orc", "parquet").foreach { provider =>
+ withTable("t1") {
+ // Set the allowlist of table providers to include the new table type
for all SQL commands.
+ withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> provider) {
+ // It is OK to create a new table with a column DEFAULT value
assigned if the table
+ // provider is in the allowlist.
+ sql(s"create table t1(a int default 42) using $provider")
+ // It is OK to add a new column to the table with a DEFAULT value to
the existing table
+ // since this table provider is not yet present in the
+ // 'ADD_DEFAULT_COLUMN_EXISTING_TABLE_BANNED_PROVIDERS' denylist.
+ sql(s"alter table t1 add column (b string default 'abc')")
+ // Insert a row into the table and check that the assigned DEFAULT
value is correct.
+ sql(s"insert into t1 values (42, default)")
+ checkAnswer(spark.table("t1"), Row(42, "abc"))
+ }
+ // Now update the allowlist of table providers to prohibit ALTER TABLE
ADD COLUMN commands
+ // from assigning DEFAULT values.
+ withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key ->
s"$provider*") {
+ assert(intercept[AnalysisException] {
+ // Try to add another column to the existing table again. This
fails because the table
+ // provider is now in the denylist.
+ sql(s"alter table t1 add column (b string default 'abc')")
+ }.getMessage.contains(
+
QueryCompilationErrors.addNewDefaultColumnToExistingTableNotAllowed(
+ "ALTER TABLE ADD COLUMNS", provider).getMessage))
+ withTable("t2") {
+ // It is still OK to create a new table with a column DEFAULT
value assigned, even if
+ // the table provider is in the above denylist.
+ sql(s"create table t2(a int default 42) using $provider")
+ // Insert a row into the table and check that the assigned DEFAULT
value is correct.
+ sql(s"insert into t2 values (default)")
+ checkAnswer(spark.table("t2"), Row(42))
+ }
+ }
+ }
+ }
+ }
+
test("Stop task set if FileAlreadyExistsException was thrown") {
Seq(true, false).foreach { fastFail =>
withSQLConf("fs.file.impl" ->
classOf[FileExistingTestFileSystem].getName,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]