This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ac21cf9ad3a [SPARK-51903][SQL] Validate data on adding a CHECK 
constraint
6ac21cf9ad3a is described below

commit 6ac21cf9ad3a47c26e8498b3a915800f93b0fdc3
Author: Gengliang Wang <[email protected]>
AuthorDate: Wed May 14 13:30:08 2025 -0700

    [SPARK-51903][SQL] Validate data on adding a CHECK constraint
    
    ### What changes were proposed in this pull request?
    
    This PR adds support for enforcing CHECK constraints when executing the 
`ALTER TABLE ... ADD CONSTRAINT` statement on V2 tables. Specifically, before 
adding a new CHECK constraint, Spark will validate the constraint against 
existing table data to ensure data integrity and compliance with the constraint 
definition.
    
    ### Why are the changes needed?
    
    CHECK constraint enforcement in the `ALTER TABLE ... ADD CONSTRAINT` 
statement is part of the ANSI SQL standard. Supporting this feature ensures 
Spark's compliance with ANSI SQL and enhances data integrity by preventing 
invalid constraints from being added to existing tables.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New UT
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50839 from gengliangwang/alterAddNewCheck.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |   6 +
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  12 +-
 .../sql/catalyst/expressions/constraints.scala     |   4 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  11 +-
 .../plans/logical/v2AlterTableCommands.scala       |  33 ++++--
 .../spark/sql/errors/QueryExecutionErrors.scala    |  10 ++
 .../execution/datasources/v2/AlterTableExec.scala  |  34 ++++++
 .../datasources/v2/DataSourceV2Strategy.scala      |  13 +++
 .../command/CheckConstraintParseSuite.scala        |  52 +++++----
 .../command/v2/CheckConstraintSuite.scala          | 130 ++++++++++++++++++---
 10 files changed, 247 insertions(+), 58 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index dc856221bead..45e9a4971fc5 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -4137,6 +4137,12 @@
     ],
     "sqlState" : "07501"
   },
+  "NEW_CHECK_CONSTRAINT_VIOLATION" : {
+    "message" : [
+      "The new check constraint (<expression>) cannot be added because it 
would be violated by existing data in table <tableName>. Please ensure all 
existing rows satisfy the constraint before adding it."
+    ],
+    "sqlState" : "23512"
+  },
   "NONEXISTENT_FIELD_NAME_IN_LIST" : {
     "message" : [
       "Field(s) <nonExistFields> do(es) not exist. Available fields: 
<fieldNames>"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index c60e99654075..431be1dbe8e8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -736,6 +736,12 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
           case write: V2WriteCommand if write.resolved =>
             write.query.schema.foreach(f => 
TypeUtils.failWithIntervalType(f.dataType))
 
+          case a: AddCheckConstraint if !a.checkConstraint.deterministic =>
+            a.checkConstraint.child.failAnalysis(
+              errorClass = "NON_DETERMINISTIC_CHECK_CONSTRAINT",
+              messageParameters = Map("checkCondition" -> 
a.checkConstraint.condition)
+            )
+
           case alter: AlterTableCommand =>
             checkAlterTableCommand(alter)
 
@@ -1037,12 +1043,6 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
       case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) 
=>
         checkColumnNotExists("rename", col.path :+ newName, table.schema)
 
-      case AddConstraint(_: ResolvedTable, check: CheckConstraint) if 
!check.deterministic =>
-        check.child.failAnalysis(
-          errorClass = "NON_DETERMINISTIC_CHECK_CONSTRAINT",
-          messageParameters = Map("checkCondition" -> check.condition)
-        )
-
       case AlterColumns(table: ResolvedTable, specs) =>
         val groupedColumns = specs.groupBy(_.column.name)
         groupedColumns.collect {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala
index da5040fcf339..a27460e2be1c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraints.scala
@@ -129,15 +129,13 @@ case class CheckConstraint(
     val predicate = new V2ExpressionBuilder(child, 
true).buildPredicate().orNull
     val enforced = userProvidedCharacteristic.enforced.getOrElse(true)
     val rely = userProvidedCharacteristic.rely.getOrElse(false)
-    // TODO(SPARK-51903): Change the status to VALIDATED when we support 
validation on ALTER TABLE
-    val validateStatus = Constraint.ValidationStatus.UNVALIDATED
     Constraint
       .check(name)
       .predicateSql(condition)
       .predicate(predicate)
       .rely(rely)
       .enforced(enforced)
-      .validationStatus(validateStatus)
+      .validationStatus(Constraint.ValidationStatus.VALID)
       .build()
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 8018b36f7282..d6be188d8a0f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -5490,9 +5490,16 @@ class AstBuilder extends DataTypeAstBuilder
     withOrigin(ctx) {
       val tableConstraint = 
visitTableConstraintDefinition(ctx.tableConstraintDefinition())
       withIdentClause(ctx.identifierReference, identifiers => {
-        val table = UnresolvedTable(identifiers, "ALTER TABLE ... ADD 
CONSTRAINT")
         val namedConstraint = tableConstraint.withTableName(identifiers.last)
-        AddConstraint(table, namedConstraint)
+        namedConstraint match {
+          case c: CheckConstraint =>
+            val relation = createUnresolvedRelation(ctx.identifierReference)
+            val child = Filter(Not(c.child), relation)
+            AddCheckConstraint(child, c)
+          case _ =>
+            val child = UnresolvedTable(identifiers, "ALTER TABLE ... ADD 
CONSTRAINT")
+            AddConstraint(child, namedConstraint)
+        }
       })
     }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index 4bd6588c0104..25a48d60585c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, 
ResolvedFieldName, ResolvedTable, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{FieldName, FieldPosition, 
ResolvedFieldName, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
-import org.apache.spark.sql.catalyst.expressions.{Expression, TableConstraint, 
Unevaluable}
+import org.apache.spark.sql.catalyst.expressions.{CheckConstraint, Expression, 
TableConstraint, Unevaluable}
 import org.apache.spark.sql.catalyst.util.{ResolveDefaultColumns, TypeUtils}
 import org.apache.spark.sql.connector.catalog.{TableCatalog, TableChange}
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -290,23 +290,34 @@ case class AlterTableCollation(
 }
 
 /**
- * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command.
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Primary 
Key, Foreign Key,
+ * and Unique constraints.
  */
 case class AddConstraint(
     table: LogicalPlan,
     tableConstraint: TableConstraint) extends AlterTableCommand {
+
   override def changes: Seq[TableChange] = {
     val constraint = tableConstraint.toV2Constraint
-    val validatedTableVersion = table match {
-      case t: ResolvedTable if constraint.enforced() =>
-        t.table.currentVersion()
-      case _ =>
-        null
-    }
-    Seq(TableChange.addConstraint(constraint, validatedTableVersion))
+    // The table version is null because the constraint is not enforced.
+    Seq(TableChange.addConstraint(constraint, null))
   }
 
-  protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = 
copy(table = newChild)
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
+    copy(table = newChild)
+}
+
+/**
+ * The logical plan of the ALTER TABLE ... ADD CONSTRAINT command for Check 
constraints.
+ * It doesn't extend [[AlterTableCommand]] because its child is a filtered 
table scan rather than
+ * a table reference.
+ */
+case class AddCheckConstraint(
+    child: LogicalPlan,
+    checkConstraint: CheckConstraint) extends UnaryCommand {
+
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
+    copy(child = newChild)
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index cb064d2a6f48..0ae7a958058d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -3026,4 +3026,14 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase with ExecutionE
       values: java.util.List[Any]): SparkRuntimeException = {
     checkViolation(constraintName, sqlStr, 
columns.asScala.zip(values.asScala).toMap)
   }
+
+  def newCheckViolation(sqlStr: String, tableName: String): 
SparkRuntimeException = {
+    new SparkRuntimeException(
+      errorClass = "NEW_CHECK_CONSTRAINT_VIOLATION",
+      messageParameters = Map(
+        "expression" -> sqlStr,
+        "tableName" -> tableName
+      )
+    )
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
index dd52ca716396..7b69c5d6f6bd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AlterTableExec.scala
@@ -21,7 +21,9 @@ import org.apache.spark.SparkThrowable
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, 
TableChange}
+import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{IdentifierHelper, 
MultipartIdentifierHelper}
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 
 /**
  * Physical plan node for altering a table.
@@ -44,3 +46,35 @@ case class AlterTableExec(
     Seq.empty
   }
 }
+
+/**
+ * Physical plan node for adding a check constraint with validation.
+ */
+case class AddCheckConstraintExec(
+    catalog: TableCatalog,
+    ident: Identifier,
+    change: TableChange,
+    condition: String,
+    child: SparkPlan) extends V2CommandExec with UnaryExecNode {
+
+  override def output: Seq[Attribute] = Seq.empty
+
+  override protected def run(): Seq[InternalRow] = {
+    if (child.executeTake(1).nonEmpty) {
+      throw QueryExecutionErrors.newCheckViolation(
+        condition, ident.toQualifiedNameParts(catalog).quoted)
+    }
+    try {
+     catalog.alterTable(ident, change)
+    } catch {
+      case e: IllegalArgumentException if !e.isInstanceOf[SparkThrowable] =>
+        throw QueryExecutionErrors.unsupportedTableChangeError(e)
+    }
+
+    Seq.empty
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan 
= {
+    copy(child = newChild)
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index d1b6d65509e2..9cbea3b69ab7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{toPrettySQL, GeneratedColumn, 
IdentityColumn, ResolveDefaultColumns, ResolveTableConstraints, 
V2ExpressionBuilder}
 import org.apache.spark.sql.classic.SparkSession
 import org.apache.spark.sql.connector.catalog.{Identifier, 
StagingTableCatalog, SupportsDeleteV2, SupportsNamespaces, 
SupportsPartitionManagement, SupportsWrite, Table, TableCapability, 
TableCatalog, TruncatableTable}
+import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
 import org.apache.spark.sql.connector.expressions.{FieldReference, 
LiteralValue}
 import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => 
V2Not, Or => V2Or, Predicate}
@@ -534,6 +535,18 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       }
       UncacheTableExec(r.table, cascade = !isTempView(r.table)) :: Nil
 
+    case a @ AddCheckConstraint(PhysicalOperation(_, _, d: 
DataSourceV2ScanRelation), check) =>
+      assert(d.relation.catalog.isDefined, "Catalog should be defined after 
analysis")
+      assert(d.relation.identifier.isDefined, "Identifier should be defined 
after analysis")
+      val catalog = d.relation.catalog.get.asTableCatalog
+      val ident = d.relation.identifier.get
+      val condition = a.checkConstraint.condition
+      val change = TableChange.addConstraint(
+        check.toV2Constraint,
+        d.relation.table.currentVersion)
+      ResolveTableConstraints.validateCatalogForTableChange(Seq(change), 
catalog, ident)
+      AddCheckConstraintExec(catalog, ident, change, condition, 
planLater(a.child)) :: Nil
+
     case a: AlterTableCommand if a.table.resolved =>
       val table = a.table.asInstanceOf[ResolvedTable]
       ResolveTableConstraints.validateCatalogForTableChange(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CheckConstraintParseSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CheckConstraintParseSuite.scala
index cfae1ea31e0d..2971d34da0e2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CheckConstraintParseSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CheckConstraintParseSuite.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.{AddConstraint, 
ColumnDefinition, CreateTable, ReplaceTable, UnresolvedTableSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{AddCheckConstraint, 
ColumnDefinition, CreateTable, Filter, ReplaceTable, UnresolvedTableSpec}
 import org.apache.spark.sql.types.StringType
 
 class CheckConstraintParseSuite extends ConstraintParseSuiteBase {
@@ -187,11 +187,13 @@ class CheckConstraintParseSuite extends 
ConstraintParseSuiteBase {
         |ALTER TABLE a.b.t ADD CONSTRAINT c1 CHECK (a > 0)
         |""".stripMargin
     val parsed = parsePlan(sql)
-    val expected = AddConstraint(
-      UnresolvedTable(
-        Seq("a", "b", "t"),
-        "ALTER TABLE ... ADD CONSTRAINT"),
-      constraint1)
+    val expected = AddCheckConstraint(
+      Filter(
+        Not(GreaterThan(UnresolvedAttribute("a"), Literal(0))),
+        UnresolvedRelation(Seq("a", "b", "t"))
+      ),
+      constraint1
+    )
     comparePlans(parsed, expected)
   }
 
@@ -224,17 +226,20 @@ class CheckConstraintParseSuite extends 
ConstraintParseSuiteBase {
            |ALTER TABLE a.b.t ADD CONSTRAINT c1 CHECK (d > 0) $enforcedStr 
$relyStr
            |""".stripMargin
       val parsed = parsePlan(sql)
-      val expected = AddConstraint(
-        UnresolvedTable(
-          Seq("a", "b", "t"),
-          "ALTER TABLE ... ADD CONSTRAINT"),
-        CheckConstraint(
-          child = GreaterThan(UnresolvedAttribute("d"), Literal(0)),
-          condition = "d > 0",
-          userProvidedName = "c1",
-          tableName = "t",
-          userProvidedCharacteristic = characteristic
-        ))
+      val expectedConstraint = CheckConstraint(
+        child = GreaterThan(UnresolvedAttribute("d"), Literal(0)),
+        condition = "d > 0",
+        userProvidedName = "c1",
+        tableName = "t",
+        userProvidedCharacteristic = characteristic
+      )
+      val expected = AddCheckConstraint(
+        Filter(
+          Not(GreaterThan(UnresolvedAttribute("d"), Literal(0))),
+          UnresolvedRelation(Seq("a", "b", "t"))
+        ),
+        expectedConstraint
+      )
       comparePlans(parsed, expected)
     }
   }
@@ -305,11 +310,12 @@ class CheckConstraintParseSuite extends 
ConstraintParseSuiteBase {
         |""".stripMargin
     val plan = parsePlan(sql)
     plan match {
-      case a: AddConstraint =>
-        val table = a.table.asInstanceOf[UnresolvedTable]
-        assert(table.multipartIdentifier == Seq("a", "b", "t"))
-        assert(a.tableConstraint == unnamedConstraint)
-        assert(a.tableConstraint.name.matches("t_chk_[0-9a-f]{7}"))
+      case a: AddCheckConstraint =>
+        comparePlans(a.child, Filter(
+          Not(GreaterThan(UnresolvedAttribute("a"), Literal(0))),
+          UnresolvedRelation(Seq("a", "b", "t"))))
+        assert(a.checkConstraint == unnamedConstraint)
+        assert(a.checkConstraint.name.matches("t_chk_[0-9a-f]{7}"))
 
       case other =>
         fail(s"Expected AddConstraint, but got: $other")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
index 72c531d40aba..0dbda6a1cf6b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
@@ -134,21 +134,20 @@ class CheckConstraintSuite extends QueryTest with 
CommandSuiteBase with DDLComma
       val constraint = getCheckConstraint(table)
       assert(constraint.name() == "c1")
       assert(constraint.toDDL ==
-        "CONSTRAINT c1 CHECK (from_json(j, 'a INT').a > 1) ENFORCED 
UNVALIDATED NORELY")
+        "CONSTRAINT c1 CHECK (from_json(j, 'a INT').a > 1) ENFORCED VALID 
NORELY")
       assert(constraint.predicateSql() == "from_json(j, 'a INT').a > 1")
       assert(constraint.predicate() == null)
     }
   }
 
   def getConstraintCharacteristics(): Seq[(String, String)] = {
-    val validStatus = "UNVALIDATED"
     Seq(
-      ("", s"ENFORCED $validStatus NORELY"),
-      ("NORELY", s"ENFORCED $validStatus NORELY"),
-      ("RELY", s"ENFORCED $validStatus RELY"),
-      ("ENFORCED", s"ENFORCED $validStatus NORELY"),
-      ("ENFORCED NORELY", s"ENFORCED $validStatus NORELY"),
-      ("ENFORCED RELY", s"ENFORCED $validStatus RELY")
+      ("", s"ENFORCED VALID NORELY"),
+      ("NORELY", s"ENFORCED VALID NORELY"),
+      ("RELY", s"ENFORCED VALID RELY"),
+      ("ENFORCED", s"ENFORCED VALID NORELY"),
+      ("ENFORCED NORELY", s"ENFORCED VALID NORELY"),
+      ("ENFORCED RELY", s"ENFORCED VALID RELY")
     )
   }
 
@@ -177,7 +176,7 @@ class CheckConstraintSuite extends QueryTest with 
CommandSuiteBase with DDLComma
             val constraint = getCheckConstraint(table)
             assert(constraint.name() == "c1")
             assert(constraint.toDDL ==
-              s"CONSTRAINT c1 CHECK (LENGTH(name) > 0) ENFORCED UNVALIDATED 
NORELY")
+              s"CONSTRAINT c1 CHECK (LENGTH(name) > 0) ENFORCED VALID NORELY")
             assert(constraint.predicateSql() == "LENGTH(name) > 0")
           }
         }
@@ -204,11 +203,11 @@ class CheckConstraintSuite extends QueryTest with 
CommandSuiteBase with DDLComma
       withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
         sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
         assert(loadTable(nonPartitionCatalog, "ns", "tbl").constraints.isEmpty)
-
+        sql(s"INSERT INTO $t VALUES (1, 'a'), (null, 'b')")
         sql(s"ALTER TABLE $t ADD CONSTRAINT c1 CHECK (id > 0) $characteristic")
         val table = loadTable(nonPartitionCatalog, "ns", "tbl")
-        assert(table.currentVersion() == "1")
-        assert(table.validatedVersion() == "0")
+        assert(table.currentVersion() == "2")
+        assert(table.validatedVersion() == "1")
         val constraint = getCheckConstraint(table)
         assert(constraint.name() == "c1")
         assert(constraint.toDDL == s"CONSTRAINT c1 CHECK (id > 0) 
$expectedDDL")
@@ -216,6 +215,111 @@ class CheckConstraintSuite extends QueryTest with 
CommandSuiteBase with DDLComma
     }
   }
 
+  test("Alter table add new check constraint with violation") {
+    getConstraintCharacteristics().foreach { case (characteristic, 
expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+        sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
+        assert(loadTable(nonPartitionCatalog, "ns", "tbl").constraints.isEmpty)
+        sql(s"INSERT INTO $t VALUES (-1, 'a'), (2, 'b')")
+        val error = intercept[SparkRuntimeException] {
+          sql(s"ALTER TABLE $t ADD CONSTRAINT c1 CHECK (id > 0) 
$characteristic")
+        }
+        checkError(
+          exception = error,
+          condition = "NEW_CHECK_CONSTRAINT_VIOLATION",
+          parameters = Map("expression" -> "id > 0", "tableName" -> 
"non_part_test_catalog.ns.tbl")
+        )
+        assert(loadTable(nonPartitionCatalog, "ns", "tbl").constraints.isEmpty)
+      }
+    }
+  }
+
+  test("Alter table add new check constraint with nested column") {
+    withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+      sql(s"CREATE TABLE $t (id INT, s STRUCT<num INT, str STRING>) 
$defaultUsing")
+      sql(s"INSERT INTO $t VALUES (1, struct(-1, 'test')), (2, struct(5, 
'valid'))")
+
+      // Add an invalid check constraint
+      val error = intercept[SparkRuntimeException] {
+        sql(s"ALTER TABLE $t ADD CONSTRAINT positive_num CHECK (s.num > 0)")
+      }
+      checkError(
+        exception = error,
+        condition = "NEW_CHECK_CONSTRAINT_VIOLATION",
+        parameters = Map("expression" -> "s.num > 0", "tableName" -> 
"non_part_test_catalog.ns.tbl")
+      )
+      assert(loadTable(nonPartitionCatalog, "ns", "tbl").constraints.isEmpty)
+
+      // Add a valid check constraint
+      sql(s"ALTER TABLE $t ADD CONSTRAINT valid_positive_num CHECK (s.num >= 
-1)")
+      val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+      assert(table.currentVersion() == "2")
+      assert(table.validatedVersion() == "1")
+      val constraint = getCheckConstraint(table)
+      assert(constraint.name() == "valid_positive_num")
+      assert(constraint.toDDL ==
+        "CONSTRAINT valid_positive_num CHECK (s.num >= -1) ENFORCED VALID 
NORELY")
+    }
+  }
+
+  test("Alter table add new check constraint with violation - map type 
column") {
+    withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+      sql(s"CREATE TABLE $t (id INT, m MAP<STRING, INT>) $defaultUsing")
+      sql(s"INSERT INTO $t VALUES (1, map('a', -1, 'b', 2)), (2, map('a', 5))")
+
+      // Add an invalid check constraint
+      val error = intercept[SparkRuntimeException] {
+        sql(s"ALTER TABLE $t ADD CONSTRAINT positive_map_val CHECK (m['a'] > 
0)")
+      }
+      checkError(
+        exception = error,
+        condition = "NEW_CHECK_CONSTRAINT_VIOLATION",
+        parameters = Map(
+          "expression" -> "m['a'] > 0",
+          "tableName" -> "non_part_test_catalog.ns.tbl")
+      )
+      assert(loadTable(nonPartitionCatalog, "ns", "tbl").constraints.isEmpty)
+
+      // Add a valid check constraint
+      sql(s"ALTER TABLE $t ADD CONSTRAINT valid_map_val CHECK (m['a'] >= -1)")
+      val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+      assert(table.currentVersion() == "2")
+      assert(table.validatedVersion() == "1")
+      val constraint = getCheckConstraint(table)
+      assert(constraint.name() == "valid_map_val")
+      assert(constraint.toDDL ==
+        "CONSTRAINT valid_map_val CHECK (m['a'] >= -1) ENFORCED VALID NORELY")
+    }
+  }
+
+  test("Alter table add new check constraint with violation - array type 
column") {
+    withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
+      sql(s"CREATE TABLE $t (id INT, a ARRAY<INT>) $defaultUsing")
+      sql(s"INSERT INTO $t VALUES (1, array(1, -2, 3)), (2, array(5, 6, 7))")
+
+      // Add an invalid check constraint
+      val error = intercept[SparkRuntimeException] {
+        sql(s"ALTER TABLE $t ADD CONSTRAINT positive_array CHECK (a[1] > 0)")
+      }
+      checkError(
+        exception = error,
+        condition = "NEW_CHECK_CONSTRAINT_VIOLATION",
+        parameters = Map("expression" -> "a[1] > 0", "tableName" -> 
"non_part_test_catalog.ns.tbl")
+      )
+      assert(loadTable(nonPartitionCatalog, "ns", "tbl").constraints.isEmpty)
+
+      // Add a valid check constraint
+      sql(s"ALTER TABLE $t ADD CONSTRAINT valid_array CHECK (a[1] >= -2)")
+      val table = loadTable(nonPartitionCatalog, "ns", "tbl")
+      assert(table.currentVersion() == "2")
+      assert(table.validatedVersion() == "1")
+      val constraint = getCheckConstraint(table)
+      assert(constraint.name() == "valid_array")
+      assert(constraint.toDDL ==
+        "CONSTRAINT valid_array CHECK (a[1] >= -2) ENFORCED VALID NORELY")
+    }
+  }
+
   test("Add duplicated check constraint") {
     withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
       sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
@@ -232,7 +336,7 @@ class CheckConstraintSuite extends QueryTest with 
CommandSuiteBase with DDLComma
           condition = "CONSTRAINT_ALREADY_EXISTS",
           sqlState = "42710",
           parameters = Map("constraintName" -> "abc",
-            "oldConstraint" -> "CONSTRAINT abc CHECK (id > 0) ENFORCED 
UNVALIDATED NORELY")
+            "oldConstraint" -> "CONSTRAINT abc CHECK (id > 0) ENFORCED VALID 
NORELY")
         )
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to