This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f9c1a2e848 [SPARK-39844][SQL] Update SQLConf for DEFAULT column 
providers to allow/deny "ALTER TABLE ... ADD COLUMN" commands separately
0f9c1a2e848 is described below

commit 0f9c1a2e848fbdfb17af0555d0c8be7d5a7191bb
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Wed Jul 27 22:35:33 2022 -0700

    [SPARK-39844][SQL] Update SQLConf for DEFAULT column providers to 
allow/deny "ALTER TABLE ... ADD COLUMN" commands separately
    
    ### What changes were proposed in this pull request?
    
    Update `SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS` to allow/deny `ALTER 
TABLE ... ADD COLUMN` commands separately.
    
    ### Why are the changes needed?
    
    When `ALTER TABLE ... ADD COLUMNS` commands assign column DEFAULT values, 
Spark constant-folds these values and stores the result in the `EXISTS_DEFAULT` 
column metadata. This allows the target data source to substitute this value 
for rows where the corresponding field is not present in storage. This 
responsibility is up to each data source.
    
    In order to grant flexibility to certain data sources that are not yet 
ready to support this functionality, in this PR we update the 
`SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS` to support an optional asterisk 
after each table provider. When the asterisk is present, `ALTER TABLE ... ADD 
COLUMN` commands may not include DEFAULT values; when absent, they may include 
them.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    Existing column DEFAULT test coverage + new unit test coverage.
    
    Closes #37256 from dtenedor/default-cannot-add-column.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  | 20 +++++++++--
 .../sql/connector/catalog/CatalogV2Util.scala      | 13 +++++---
 .../spark/sql/errors/QueryCompilationErrors.scala  | 15 +++++++--
 .../org/apache/spark/sql/internal/SQLConf.scala    |  7 +++-
 .../sql/catalyst/catalog/SessionCatalogSuite.scala |  2 +-
 .../spark/sql/execution/command/tables.scala       |  2 +-
 .../execution/datasources/DataSourceStrategy.scala |  2 +-
 .../datasources/v2/DataSourceV2Strategy.scala      |  4 +--
 .../org/apache/spark/sql/sources/InsertSuite.scala | 39 ++++++++++++++++++++++
 9 files changed, 87 insertions(+), 17 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 4865fe378cc..0e7ff50f194 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -90,22 +90,36 @@ object ResolveDefaultColumns {
    * @param tableSchema   represents the names and types of the columns of the 
statement to process.
    * @param tableProvider provider of the target table to store default values 
for, if any.
    * @param statementType name of the statement being processed, such as 
INSERT; useful for errors.
+   * @param addNewColumnToExistingTable true if the statement being processed 
adds a new column to
+   *                                    a table that already exists.
    * @return a copy of `tableSchema` with field metadata updated with the 
constant-folded values.
    */
   def constantFoldCurrentDefaultsToExistDefaults(
       tableSchema: StructType,
       tableProvider: Option[String],
-      statementType: String): StructType = {
+      statementType: String,
+      addNewColumnToExistingTable: Boolean): StructType = {
     if (SQLConf.get.enableDefaultColumns) {
-      val allowedTableProviders: Array[String] =
+      val keywords: Array[String] =
         SQLConf.get.getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)
           .toLowerCase().split(",").map(_.trim)
+      val allowedTableProviders: Array[String] =
+        keywords.map(_.stripSuffix("*"))
+      val addColumnExistingTableBannedProviders: Array[String] =
+        keywords.filter(_.endsWith("*")).map(_.stripSuffix("*"))
       val givenTableProvider: String = 
tableProvider.getOrElse("").toLowerCase()
       val newFields: Seq[StructField] = tableSchema.fields.map { field =>
         if (field.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
           // Make sure that the target table has a provider that supports 
default column values.
           if (!allowedTableProviders.contains(givenTableProvider)) {
-            throw 
QueryCompilationErrors.defaultReferencesNotAllowedInDataSource(givenTableProvider)
+            throw QueryCompilationErrors
+              .defaultReferencesNotAllowedInDataSource(statementType, 
givenTableProvider)
+          }
+          if (addNewColumnToExistingTable &&
+            givenTableProvider.nonEmpty &&
+            
addColumnExistingTableBannedProviders.contains(givenTableProvider)) {
+            throw QueryCompilationErrors
+              .addNewDefaultColumnToExistingTableNotAllowed(statementType, 
givenTableProvider)
           }
           val analyzed: Expression = analyze(field, statementType)
           val newMetadata: Metadata = new 
MetadataBuilder().withMetadata(field.metadata)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index a5087d39e98..abd43065048 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -146,7 +146,7 @@ private[sql] object CatalogV2Util {
                 
Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
               val fieldWithComment: StructField =
                 
Option(add.comment).map(fieldWithDefault.withComment).getOrElse(fieldWithDefault)
-              addField(schema, fieldWithComment, add.position(), 
tableProvider, statementType)
+              addField(schema, fieldWithComment, add.position(), 
tableProvider, statementType, true)
             case names =>
               replace(schema, names.init, parent => parent.dataType match {
                 case parentType: StructType =>
@@ -158,7 +158,7 @@ private[sql] object CatalogV2Util {
                       .getOrElse(fieldWithDefault)
                   Some(parent.copy(dataType =
                     addField(parentType, fieldWithComment, add.position(), 
tableProvider,
-                      statementType)))
+                      statementType, true)))
                 case _ =>
                   throw new IllegalArgumentException(s"Not a struct: 
${names.init.last}")
               })
@@ -188,7 +188,8 @@ private[sql] object CatalogV2Util {
               throw new IllegalArgumentException("Field not found: " + name)
             }
             val withFieldRemoved = StructType(struct.fields.filter(_ != 
oldField))
-            addField(withFieldRemoved, oldField, update.position(), 
tableProvider, statementType)
+            addField(withFieldRemoved, oldField, update.position(), 
tableProvider, statementType,
+              false)
           }
 
           update.fieldNames() match {
@@ -230,7 +231,8 @@ private[sql] object CatalogV2Util {
       field: StructField,
       position: ColumnPosition,
       tableProvider: Option[String],
-      statementType: String): StructType = {
+      statementType: String,
+      addNewColumnToExistingTable: Boolean): StructType = {
     val newSchema: StructType = if (position == null) {
       schema.add(field)
     } else if (position.isInstanceOf[First]) {
@@ -244,7 +246,8 @@ private[sql] object CatalogV2Util {
       val (before, after) = schema.fields.splitAt(fieldIndex + 1)
       StructType(before ++ (field +: after))
     }
-    constantFoldCurrentDefaultsToExistDefaults(newSchema, tableProvider, 
statementType)
+    constantFoldCurrentDefaultsToExistDefaults(
+      newSchema, tableProvider, statementType, addNewColumnToExistingTable)
   }
 
   private def replace(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index c344c64997f..430fceb76b5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2515,10 +2515,19 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
         "literal value")
   }
 
-  def defaultReferencesNotAllowedInDataSource(dataSource: String): Throwable = 
{
+  def defaultReferencesNotAllowedInDataSource(
+      statementType: String, dataSource: String): Throwable = {
     new AnalysisException(
-      s"Failed to execute command because DEFAULT values are not supported for 
target data " +
-        "source with table provider: \"" + dataSource + "\"")
+      s"Failed to execute $statementType command because DEFAULT values are 
not supported for " +
+        "target data source with table provider: \"" + dataSource + "\"")
+  }
+
+  def addNewDefaultColumnToExistingTableNotAllowed(
+      statementType: String, dataSource: String): Throwable = {
+    new AnalysisException(
+      s"Failed to execute $statementType command because DEFAULT values are 
not supported when " +
+        "adding new columns to previously existing target data source with 
table " +
+        "provider: \"" + dataSource + "\"")
   }
 
   def defaultValuesMayNotContainSubQueryExpressions(): Throwable = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 631c89d798f..f78fa8c9ef2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2914,7 +2914,12 @@ object SQLConf {
     buildConf("spark.sql.defaultColumn.allowedProviders")
       .internal()
       .doc("List of table providers wherein SQL commands are permitted to 
assign DEFAULT column " +
-        "values. Comma-separated list, whitespace ignored, case-insensitive.")
+        "values. Comma-separated list, whitespace ignored, case-insensitive. 
If an asterisk " +
+        "appears after any table provider in this list, any command may assign 
DEFAULT column " +
+        "except `ALTER TABLE ... ADD COLUMN`. Otherwise, if no asterisk 
appears, all commands " +
+        "are permitted. This is useful because in order for such `ALTER TABLE 
... ADD COLUMN` " +
+        "commands to work, the target data source must include support for 
substituting in the " +
+        "provided values when the corresponding fields are not present in 
storage.")
       .version("3.4.0")
       .stringConf
       .createWithDefault("csv,json,orc,parquet")
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 3a248f66c1f..e9b0608a08a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -174,7 +174,7 @@ abstract class SessionCatalogSuite extends AnalysisTest 
with Eventually {
       // disabled.
       withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
         val result: StructType = 
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          db1tbl3.schema, db1tbl3.provider, "CREATE TABLE")
+          db1tbl3.schema, db1tbl3.provider, "CREATE TABLE", false)
         val columnEWithFeatureDisabled: StructField = findField("e", result)
         // No constant-folding has taken place to the EXISTS_DEFAULT metadata.
         assert(!columnEWithFeatureDisabled.metadata.contains("EXISTS_DEFAULT"))
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index d8c5c04082b..a96857ab102 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -290,7 +290,7 @@ case class AlterTableAddColumnsCommand(
     colsToAdd.map { col: StructField =>
       if 
(col.metadata.contains(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY))
 {
         val foldedStructType = 
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          StructType(Seq(col)), tableProvider, "ALTER TABLE ADD COLUMNS")
+          StructType(Seq(col)), tableProvider, "ALTER TABLE ADD COLUMNS", true)
         foldedStructType.fields(0)
       } else {
         col
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 7f30300a39c..4c2c8a8ac88 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -149,7 +149,7 @@ case class DataSourceAnalysis(analyzer: Analyzer) extends 
Rule[LogicalPlan] {
     case CreateTable(tableDesc, mode, None) if 
DDLUtils.isDatasourceTable(tableDesc) =>
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          tableDesc.schema, tableDesc.provider, "CREATE TABLE")
+          tableDesc.schema, tableDesc.provider, "CREATE TABLE", false)
       val newTableDesc = tableDesc.copy(schema = newSchema)
       CreateDataSourceTableCommand(newTableDesc, ignoreIfExists = mode == 
SaveMode.Ignore)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 16c6b331d10..e057b43f799 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -173,7 +173,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
         tableSpec, ifNotExists) =>
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          schema, tableSpec.provider, "CREATE TABLE")
+          schema, tableSpec.provider, "CREATE TABLE", false)
       CreateTableExec(catalog.asTableCatalog, ident, newSchema,
         partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
 
@@ -195,7 +195,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
     case ReplaceTable(ResolvedIdentifier(catalog, ident), schema, parts, 
tableSpec, orCreate) =>
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
-          schema, tableSpec.provider, "CREATE TABLE")
+          schema, tableSpec.provider, "CREATE TABLE", false)
       catalog match {
         case staging: StagingTableCatalog =>
           AtomicReplaceTableExec(staging, ident, newSchema, parts,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 7497aa66fa6..7edd61828d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1840,6 +1840,45 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
+  test("SPARK-39844 Restrict adding DEFAULT columns for existing tables to 
certain sources") {
+    Seq("csv", "json", "orc", "parquet").foreach { provider =>
+      withTable("t1") {
+        // Set the allowlist of table providers to include the new table type 
for all SQL commands.
+        withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> provider) {
+          // It is OK to create a new table with a column DEFAULT value 
assigned if the table
+          // provider is in the allowlist.
+          sql(s"create table t1(a int default 42) using $provider")
+          // It is OK to add a new column to the table with a DEFAULT value to 
the existing table
+          // since this table provider is not yet present in the
+          // 'ADD_DEFAULT_COLUMN_EXISTING_TABLE_BANNED_PROVIDERS' denylist.
+          sql(s"alter table t1 add column (b string default 'abc')")
+          // Insert a row into the table and check that the assigned DEFAULT 
value is correct.
+          sql(s"insert into t1 values (42, default)")
+          checkAnswer(spark.table("t1"), Row(42, "abc"))
+        }
+        // Now update the allowlist of table providers to prohibit ALTER TABLE 
ADD COLUMN commands
+        // from assigning DEFAULT values.
+        withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> 
s"$provider*") {
+          assert(intercept[AnalysisException] {
+            // Try to add another column to the existing table again. This 
fails because the table
+            // provider is now in the denylist.
+            sql(s"alter table t1 add column (b string default 'abc')")
+          }.getMessage.contains(
+            
QueryCompilationErrors.addNewDefaultColumnToExistingTableNotAllowed(
+              "ALTER TABLE ADD COLUMNS", provider).getMessage))
+          withTable("t2") {
+            // It is still OK to create a new table with a column DEFAULT 
value assigned, even if
+            // the table provider is in the above denylist.
+            sql(s"create table t2(a int default 42) using $provider")
+            // Insert a row into the table and check that the assigned DEFAULT 
value is correct.
+            sql(s"insert into t2 values (default)")
+            checkAnswer(spark.table("t2"), Row(42))
+          }
+        }
+      }
+    }
+  }
+
   test("Stop task set if FileAlreadyExistsException was thrown") {
     Seq(true, false).foreach { fastFail =>
       withSQLConf("fs.file.impl" -> 
classOf[FileExistingTestFileSystem].getName,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to