This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b0001fbaa059 [SPARK-51834][SQL][FOLLOW-UP] Add missing 
`.withConstraints()` in `AtomicReplaceTableExec`
b0001fbaa059 is described below

commit b0001fbaa0590ab2f77802dda966ba210e6be1e6
Author: Yan Yan <[email protected]>
AuthorDate: Wed Feb 18 14:34:57 2026 -0800

    [SPARK-51834][SQL][FOLLOW-UP] Add missing `.withConstraints()` in 
`AtomicReplaceTableExec`
    
    ### What changes were proposed in this pull request?
    
    SPARK-51834 added `.withConstraints()` to `CreateTableExec` and 
`ReplaceTableExec` but missed `AtomicReplaceTableExec` in the same file. This 
causes `REPLACE TABLE` and `CREATE OR REPLACE TABLE` with constraints to 
silently drop them when the catalog implements `StagingTableCatalog`.
    
    Also fixes `StagingInMemoryTableCatalog` to forward constraints when 
constructing `InMemoryTable`, and adds regression tests for all four constraint 
types through the atomic replace path.
    
    ### Why are the changes needed?
    `AtomicReplaceTableExec` does not call `.withConstraints()` when building 
`TableInfo`, so `REPLACE TABLE` and `CREATE OR REPLACE TABLE` silently drop 
constraints when the catalog implements `StagingTableCatalog`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Previously, REPLACE TABLE and CREATE OR REPLACE TABLE with constraints 
would silently drop constraints when using a `StagingTableCatalog`. After this 
fix, constraints are correctly passed through to the catalog.
    
    ### How was this patch tested?
    New unit tests in `UniqueConstraintSuite`, `PrimaryKeyConstraintSuite`, 
`CheckConstraintSuite`, and `ForeignKeyConstraintSuite` that exercise `REPLACE 
TABLE` with constraints through the atomic catalog path 
(`StagingInMemoryTableCatalog`). Verified that all 4 new tests fail without the 
fix and pass with it. All existing tests in these suites continue to pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: Claude Opus 4.6 <noreplyanthropic.com>
    
    Closes #54322 from yyanyy/staging_catalog_constraint.
    
    Authored-by: Yan Yan <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../catalog/StagingInMemoryTableCatalog.scala          |  9 ++++++---
 .../execution/datasources/v2/ReplaceTableExec.scala    |  2 ++
 .../execution/command/v2/CheckConstraintSuite.scala    | 14 ++++++++++++++
 .../sql/execution/command/v2/CommandSuiteBase.scala    |  2 ++
 .../command/v2/ForeignKeyConstraintSuite.scala         | 18 ++++++++++++++++++
 .../command/v2/PrimaryKeyConstraintSuite.scala         | 15 +++++++++++++++
 .../execution/command/v2/UniqueConstraintSuite.scala   | 15 +++++++++++++++
 7 files changed, 72 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala
index ee2400cab35c..7ded99c709a3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala
@@ -36,7 +36,8 @@ class StagingInMemoryTableCatalog extends 
InMemoryTableCatalog with StagingTable
     new TestStagedCreateTable(
       ident,
       new InMemoryTable(s"$name.${ident.quoted}",
-        tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))
+        tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(),
+        tableInfo.constraints()))
   }
 
   override def stageReplace(ident: Identifier, tableInfo: TableInfo): 
StagedTable = {
@@ -44,7 +45,8 @@ class StagingInMemoryTableCatalog extends 
InMemoryTableCatalog with StagingTable
     new TestStagedReplaceTable(
       ident,
       new InMemoryTable(s"$name.${ident.quoted}",
-        tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))
+        tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(),
+        tableInfo.constraints()))
   }
 
   override def stageCreateOrReplace(ident: Identifier, tableInfo: TableInfo) : 
StagedTable = {
@@ -52,7 +54,8 @@ class StagingInMemoryTableCatalog extends 
InMemoryTableCatalog with StagingTable
     new TestStagedCreateOrReplaceTable(
       ident,
       new InMemoryTable(s"$name.${ident.quoted}",
-        tableInfo.schema(), tableInfo.partitions(), tableInfo.properties))
+        tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(),
+        tableInfo.constraints()))
   }
 
   private def validateStagedTable(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
index 7ce95ced0d24..454a4041d36e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
@@ -83,6 +83,7 @@ case class AtomicReplaceTableExec(
         .withColumns(columns)
         .withPartitions(partitioning.toArray)
         .withProperties(tableProperties.asJava)
+        .withConstraints(tableSpec.constraints.toArray)
         .build()
       catalog.stageCreateOrReplace(identifier, tableInfo)
     } else if (catalog.tableExists(identifier)) {
@@ -91,6 +92,7 @@ case class AtomicReplaceTableExec(
           .withColumns(columns)
           .withPartitions(partitioning.toArray)
           .withProperties(tableProperties.asJava)
+          .withConstraints(tableSpec.constraints.toArray)
           .build()
         catalog.stageReplace(identifier, tableInfo)
       } catch {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
index ee2dd476958e..8e295ecc3d5d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
@@ -205,6 +205,20 @@ class CheckConstraintSuite extends QueryTest with 
CommandSuiteBase with DDLComma
     }
   }
 
+  test("Replace table with check constraint using atomic catalog") {
+    getConstraintCharacteristics().foreach { case (characteristic, 
expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
+        val constraintStr = s"CONSTRAINT c1 CHECK (id > 0) $characteristic"
+        sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+        sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) 
$defaultUsing")
+        val table = loadTable(atomicCatalog, "ns", "tbl")
+        val constraint = getCheckConstraint(table)
+        assert(constraint.name() == "c1")
+        assert(constraint.toDDL == s"CONSTRAINT c1 CHECK (id > 0) 
$expectedDDL")
+      }
+    }
+  }
+
   test("Alter table add check constraint") {
     getConstraintCharacteristics().foreach { case (characteristic, 
expectedDDL) =>
       withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala
index 24bc4483d31c..548e15f9e783 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala
@@ -35,6 +35,7 @@ trait CommandSuiteBase extends SharedSparkSession {
   def commandVersion: String = "V2" // The command version is added to test 
names
   def catalog: String = "test_catalog" // The default V2 catalog for testing
   def nonPartitionCatalog: String = "non_part_test_catalog" // Catalog for 
non-partitioned tables
+  def atomicCatalog: String = "atomic_test_catalog" // Catalog with 
StagingTableCatalog support
   def rowLevelOPCatalog: String = "row_level_op_catalog"
   def defaultUsing: String = "USING _" // The clause is used in creating v2 
tables under testing
 
@@ -42,6 +43,7 @@ trait CommandSuiteBase extends SharedSparkSession {
   override def sparkConf: SparkConf = super.sparkConf
     .set(s"spark.sql.catalog.$catalog", 
classOf[InMemoryPartitionTableCatalog].getName)
     .set(s"spark.sql.catalog.$nonPartitionCatalog", 
classOf[InMemoryTableCatalog].getName)
+    .set(s"spark.sql.catalog.$atomicCatalog", 
classOf[StagingInMemoryTableCatalog].getName)
     .set(s"spark.sql.catalog.fun_$catalog", classOf[InMemoryCatalog].getName)
     .set(s"spark.sql.catalog.$rowLevelOPCatalog",
       classOf[InMemoryRowLevelOperationTableCatalog].getName)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala
index a876013490ea..b1eb85b45063 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala
@@ -85,6 +85,24 @@ class ForeignKeyConstraintSuite extends QueryTest with 
CommandSuiteBase with DDL
     }
   }
 
+  test("REPLACE table with foreign key constraint using atomic catalog") {
+    validConstraintCharacteristics.foreach { case (characteristic, 
expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
+        sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+        sql(s"CREATE TABLE ${t}_ref (id bigint, data string) $defaultUsing")
+        val constraintStr = s"CONSTRAINT fk1 FOREIGN KEY (fk) " +
+          s"REFERENCES ${t}_ref(id) $characteristic"
+        sql(s"REPLACE TABLE $t (id bigint, fk bigint, data string, 
$constraintStr) $defaultUsing")
+        val table = loadTable(atomicCatalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "fk1")
+        assert(constraint.toDDL == s"CONSTRAINT fk1 FOREIGN KEY (fk) " +
+          s"REFERENCES $atomicCatalog.ns.tbl_ref (id) $expectedDDL")
+      }
+    }
+  }
+
   test("Add duplicated foreign key constraint") {
     withNamespaceAndTable("ns", "tbl", catalog) { t =>
       sql(s"CREATE TABLE $t (id bigint, fk bigint, data string) $defaultUsing")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala
index f692f9588161..f7d551d4e34d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala
@@ -76,6 +76,21 @@ class PrimaryKeyConstraintSuite extends QueryTest with 
CommandSuiteBase with DDL
     }
   }
 
+  test("Replace table with primary key constraint using atomic catalog") {
+    validConstraintCharacteristics.foreach { case (characteristic, 
expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
+        val constraintStr = s"CONSTRAINT pk1 PRIMARY KEY (id) $characteristic"
+        sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+        sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) 
$defaultUsing")
+        val table = loadTable(atomicCatalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "pk1")
+        assert(constraint.toDDL == s"CONSTRAINT pk1 PRIMARY KEY (id) 
$expectedDDL")
+      }
+    }
+  }
+
   test("Add duplicated primary key constraint") {
     withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
       sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala
index 6efc3912af9d..96a9945b10f9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala
@@ -76,6 +76,21 @@ class UniqueConstraintSuite extends QueryTest with 
CommandSuiteBase with DDLComm
     }
   }
 
+  test("Replace table with unique constraint using atomic catalog") {
+    validConstraintCharacteristics.foreach { case (characteristic, 
expectedDDL) =>
+      withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
+        val constraintStr = s"CONSTRAINT uk1 UNIQUE (id) $characteristic"
+        sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+        sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) 
$defaultUsing")
+        val table = loadTable(atomicCatalog, "ns", "tbl")
+        assert(table.constraints.length == 1)
+        val constraint = table.constraints.head
+        assert(constraint.name() == "uk1")
+        assert(constraint.toDDL == s"CONSTRAINT uk1 UNIQUE (id) $expectedDDL")
+      }
+    }
+  }
+
   test("Add duplicated unique constraint") {
     withNamespaceAndTable("ns", "tbl", catalog) { t =>
       sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to