This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b0001fbaa059 [SPARK-51834][SQL][FOLLOW-UP] Add missing
`.withConstraints()` in `AtomicReplaceTableExec`
b0001fbaa059 is described below
commit b0001fbaa0590ab2f77802dda966ba210e6be1e6
Author: Yan Yan <[email protected]>
AuthorDate: Wed Feb 18 14:34:57 2026 -0800
[SPARK-51834][SQL][FOLLOW-UP] Add missing `.withConstraints()` in
`AtomicReplaceTableExec`
### What changes were proposed in this pull request?
SPARK-51834 added `.withConstraints()` to `CreateTableExec` and
`ReplaceTableExec` but missed `AtomicReplaceTableExec` in the same file. This
causes `REPLACE TABLE` and `CREATE OR REPLACE TABLE` with constraints to
silently drop them when the catalog implements `StagingTableCatalog`.
Also fixes `StagingInMemoryTableCatalog` to forward constraints when
constructing `InMemoryTable`, and adds regression tests for all four constraint
types through the atomic replace path.
### Why are the changes needed?
`AtomicReplaceTableExec` does not call `.withConstraints()` when building
`TableInfo`, so `REPLACE TABLE` and `CREATE OR REPLACE TABLE` silently drop
constraints when the catalog implements `StagingTableCatalog`.
### Does this PR introduce _any_ user-facing change?
Yes. Previously, REPLACE TABLE and CREATE OR REPLACE TABLE with constraints
would silently drop constraints when using a `StagingTableCatalog`. After this
fix, constraints are correctly passed through to the catalog.
### How was this patch tested?
New unit tests in `UniqueConstraintSuite`, `PrimaryKeyConstraintSuite`,
`CheckConstraintSuite`, and `ForeignKeyConstraintSuite` that exercise `REPLACE
TABLE` with constraints through the atomic catalog path
(`StagingInMemoryTableCatalog`). Verified that all 4 new tests fail without the
fix and pass with it. All existing tests in these suites continue to pass.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.6 <noreplyanthropic.com>
Closes #54322 from yyanyy/staging_catalog_constraint.
Authored-by: Yan Yan <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../catalog/StagingInMemoryTableCatalog.scala | 9 ++++++---
.../execution/datasources/v2/ReplaceTableExec.scala | 2 ++
.../execution/command/v2/CheckConstraintSuite.scala | 14 ++++++++++++++
.../sql/execution/command/v2/CommandSuiteBase.scala | 2 ++
.../command/v2/ForeignKeyConstraintSuite.scala | 18 ++++++++++++++++++
.../command/v2/PrimaryKeyConstraintSuite.scala | 15 +++++++++++++++
.../execution/command/v2/UniqueConstraintSuite.scala | 15 +++++++++++++++
7 files changed, 72 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala
index ee2400cab35c..7ded99c709a3 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala
@@ -36,7 +36,8 @@ class StagingInMemoryTableCatalog extends
InMemoryTableCatalog with StagingTable
new TestStagedCreateTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}",
- tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))
+ tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(),
+ tableInfo.constraints()))
}
override def stageReplace(ident: Identifier, tableInfo: TableInfo):
StagedTable = {
@@ -44,7 +45,8 @@ class StagingInMemoryTableCatalog extends
InMemoryTableCatalog with StagingTable
new TestStagedReplaceTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}",
- tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))
+ tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(),
+ tableInfo.constraints()))
}
override def stageCreateOrReplace(ident: Identifier, tableInfo: TableInfo) :
StagedTable = {
@@ -52,7 +54,8 @@ class StagingInMemoryTableCatalog extends
InMemoryTableCatalog with StagingTable
new TestStagedCreateOrReplaceTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}",
- tableInfo.schema(), tableInfo.partitions(), tableInfo.properties))
+ tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(),
+ tableInfo.constraints()))
}
private def validateStagedTable(
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
index 7ce95ced0d24..454a4041d36e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
@@ -83,6 +83,7 @@ case class AtomicReplaceTableExec(
.withColumns(columns)
.withPartitions(partitioning.toArray)
.withProperties(tableProperties.asJava)
+ .withConstraints(tableSpec.constraints.toArray)
.build()
catalog.stageCreateOrReplace(identifier, tableInfo)
} else if (catalog.tableExists(identifier)) {
@@ -91,6 +92,7 @@ case class AtomicReplaceTableExec(
.withColumns(columns)
.withPartitions(partitioning.toArray)
.withProperties(tableProperties.asJava)
+ .withConstraints(tableSpec.constraints.toArray)
.build()
catalog.stageReplace(identifier, tableInfo)
} catch {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
index ee2dd476958e..8e295ecc3d5d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala
@@ -205,6 +205,20 @@ class CheckConstraintSuite extends QueryTest with
CommandSuiteBase with DDLComma
}
}
+ test("Replace table with check constraint using atomic catalog") {
+ getConstraintCharacteristics().foreach { case (characteristic,
expectedDDL) =>
+ withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
+ val constraintStr = s"CONSTRAINT c1 CHECK (id > 0) $characteristic"
+ sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+ sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr)
$defaultUsing")
+ val table = loadTable(atomicCatalog, "ns", "tbl")
+ val constraint = getCheckConstraint(table)
+ assert(constraint.name() == "c1")
+ assert(constraint.toDDL == s"CONSTRAINT c1 CHECK (id > 0)
$expectedDDL")
+ }
+ }
+ }
+
test("Alter table add check constraint") {
getConstraintCharacteristics().foreach { case (characteristic,
expectedDDL) =>
withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala
index 24bc4483d31c..548e15f9e783 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala
@@ -35,6 +35,7 @@ trait CommandSuiteBase extends SharedSparkSession {
def commandVersion: String = "V2" // The command version is added to test
names
def catalog: String = "test_catalog" // The default V2 catalog for testing
def nonPartitionCatalog: String = "non_part_test_catalog" // Catalog for
non-partitioned tables
+ def atomicCatalog: String = "atomic_test_catalog" // Catalog with
StagingTableCatalog support
def rowLevelOPCatalog: String = "row_level_op_catalog"
def defaultUsing: String = "USING _" // The clause is used in creating v2
tables under testing
@@ -42,6 +43,7 @@ trait CommandSuiteBase extends SharedSparkSession {
override def sparkConf: SparkConf = super.sparkConf
.set(s"spark.sql.catalog.$catalog",
classOf[InMemoryPartitionTableCatalog].getName)
.set(s"spark.sql.catalog.$nonPartitionCatalog",
classOf[InMemoryTableCatalog].getName)
+ .set(s"spark.sql.catalog.$atomicCatalog",
classOf[StagingInMemoryTableCatalog].getName)
.set(s"spark.sql.catalog.fun_$catalog", classOf[InMemoryCatalog].getName)
.set(s"spark.sql.catalog.$rowLevelOPCatalog",
classOf[InMemoryRowLevelOperationTableCatalog].getName)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala
index a876013490ea..b1eb85b45063 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala
@@ -85,6 +85,24 @@ class ForeignKeyConstraintSuite extends QueryTest with
CommandSuiteBase with DDL
}
}
+ test("REPLACE table with foreign key constraint using atomic catalog") {
+ validConstraintCharacteristics.foreach { case (characteristic,
expectedDDL) =>
+ withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
+ sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+ sql(s"CREATE TABLE ${t}_ref (id bigint, data string) $defaultUsing")
+ val constraintStr = s"CONSTRAINT fk1 FOREIGN KEY (fk) " +
+ s"REFERENCES ${t}_ref(id) $characteristic"
+ sql(s"REPLACE TABLE $t (id bigint, fk bigint, data string,
$constraintStr) $defaultUsing")
+ val table = loadTable(atomicCatalog, "ns", "tbl")
+ assert(table.constraints.length == 1)
+ val constraint = table.constraints.head
+ assert(constraint.name() == "fk1")
+ assert(constraint.toDDL == s"CONSTRAINT fk1 FOREIGN KEY (fk) " +
+ s"REFERENCES $atomicCatalog.ns.tbl_ref (id) $expectedDDL")
+ }
+ }
+ }
+
test("Add duplicated foreign key constraint") {
withNamespaceAndTable("ns", "tbl", catalog) { t =>
sql(s"CREATE TABLE $t (id bigint, fk bigint, data string) $defaultUsing")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala
index f692f9588161..f7d551d4e34d 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala
@@ -76,6 +76,21 @@ class PrimaryKeyConstraintSuite extends QueryTest with
CommandSuiteBase with DDL
}
}
+ test("Replace table with primary key constraint using atomic catalog") {
+ validConstraintCharacteristics.foreach { case (characteristic,
expectedDDL) =>
+ withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
+ val constraintStr = s"CONSTRAINT pk1 PRIMARY KEY (id) $characteristic"
+ sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+ sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr)
$defaultUsing")
+ val table = loadTable(atomicCatalog, "ns", "tbl")
+ assert(table.constraints.length == 1)
+ val constraint = table.constraints.head
+ assert(constraint.name() == "pk1")
+ assert(constraint.toDDL == s"CONSTRAINT pk1 PRIMARY KEY (id)
$expectedDDL")
+ }
+ }
+ }
+
test("Add duplicated primary key constraint") {
withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala
index 6efc3912af9d..96a9945b10f9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala
@@ -76,6 +76,21 @@ class UniqueConstraintSuite extends QueryTest with
CommandSuiteBase with DDLComm
}
}
+ test("Replace table with unique constraint using atomic catalog") {
+ validConstraintCharacteristics.foreach { case (characteristic,
expectedDDL) =>
+ withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
+ val constraintStr = s"CONSTRAINT uk1 UNIQUE (id) $characteristic"
+ sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
+ sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr)
$defaultUsing")
+ val table = loadTable(atomicCatalog, "ns", "tbl")
+ assert(table.constraints.length == 1)
+ val constraint = table.constraints.head
+ assert(constraint.name() == "uk1")
+ assert(constraint.toDDL == s"CONSTRAINT uk1 UNIQUE (id) $expectedDDL")
+ }
+ }
+ }
+
test("Add duplicated unique constraint") {
withNamespaceAndTable("ns", "tbl", catalog) { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]