This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a803c38c38d9 [SPARK-55716][SQL][FOLLOWUP] Simplify NOT NULL
preservation by skipping asNullable in resolveRelation
a803c38c38d9 is described below
commit a803c38c38d91464fd2ac0b7cadaa1577be9502f
Author: Kent Yao <[email protected]>
AuthorDate: Wed Mar 4 15:25:49 2026 +0800
[SPARK-55716][SQL][FOLLOWUP] Simplify NOT NULL preservation by skipping
asNullable in resolveRelation
### What changes were proposed in this pull request?
Followup to #54517. Simplifies NOT NULL constraint preservation per [review
feedback](https://github.com/apache/spark/pull/54517#discussion_r2878532653).
Instead of calling `dataSchema.asNullable` in `resolveRelation()` and then
restoring nullability with recursive
`restoreNullability`/`restoreDataTypeNullability` helpers in
`CreateDataSourceTableCommand`, this PR:
1. Adds a `forceNullable` parameter to `DataSource.resolveRelation()`
(default `true`, preserving existing behavior)
2. Passes `forceNullable = !enforceNotNull` from
`CreateDataSourceTableCommand`, so `asNullable` is skipped only when the config
is enabled
3. Removes `restoreNullability` and `restoreDataTypeNullability` helper
methods entirely
**Data flow:**
- **Config OFF** (default): `forceNullable = true` → `asNullable` runs →
same behavior as before SPARK-55716
- **Config ON**: `forceNullable = false` → `asNullable` skipped → catalog
stores NOT NULL → `PreprocessTableInsertion` enforces at insert time
### Why are the changes needed?
Addresses review feedback: "is it simpler to not do `dataSchema.asNullable`
if the flag is on?"
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing SPARK-55716 tests (7 tests in InsertSuite) and
ShowCreateTableSuite (30 tests) all pass.
### Was this patch authored or co-authored using generative AI tooling?
Yes, co-authored with GitHub Copilot.
Closes #54597 from yaooqinn/SPARK-55716.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../execution/command/createDataSourceTables.scala | 50 +++-------------------
.../sql/execution/datasources/DataSource.scala | 7 ++-
.../command/v1/ShowCreateTableSuite.scala | 2 +-
.../thriftserver/SparkMetadataOperationSuite.scala | 6 +--
4 files changed, 14 insertions(+), 51 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 2edac5b0179b..0415a33e2d6d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -28,8 +28,9 @@ import
org.apache.spark.sql.classic.ClassicConversions.castToImpl
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ArrayImplicits._
/**
@@ -79,7 +80,9 @@ case class CreateDataSourceTableCommand(table: CatalogTable,
ignoreIfExists: Boo
bucketSpec = table.bucketSpec,
options = table.storage.properties ++ pathOption,
// As discussed in SPARK-19583, we don't check if the location is
existed
- catalogTable =
Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false)
+ catalogTable = Some(tableWithDefaultOptions))
+ .resolveRelation(checkFilesExist = false,
+ forceNullable =
!sessionState.conf.getConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL))
val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames
@@ -107,17 +110,8 @@ case class CreateDataSourceTableCommand(table:
CatalogTable, ignoreIfExists: Boo
table.copy(schema = new StructType(), partitionColumnNames = Nil)
case _ =>
- // Merge nullability from the user-specified schema into the resolved
schema.
- // DataSource.resolveRelation() calls dataSchema.asNullable which
strips NOT NULL
- // constraints. We restore nullability from the original user schema
while keeping
- // the resolved data types (which may include CharVarchar
normalization, metadata, etc.)
- val resolvedSchema = if (table.schema.nonEmpty) {
- restoreNullability(dataSource.schema, table.schema)
- } else {
- dataSource.schema
- }
table.copy(
- schema = resolvedSchema,
+ schema = dataSource.schema,
partitionColumnNames = partitionColumnNames,
// If metastore partition management for file source tables is
enabled, we start off with
// partition provider hive, but no partitions in the metastore. The
user has to call
@@ -132,38 +126,6 @@ case class CreateDataSourceTableCommand(table:
CatalogTable, ignoreIfExists: Boo
Seq.empty[Row]
}
- /**
- * Recursively restores nullability from the original user-specified schema
into
- * the resolved schema. The resolved schema's data types are preserved (they
may
- * contain CharVarchar normalization, metadata, etc.), but nullability flags
- * (top-level and nested) are taken from the original schema.
- */
- private def restoreNullability(resolved: StructType, original: StructType):
StructType = {
- val originalFields = original.fields.map(f => f.name -> f).toMap
- StructType(resolved.fields.map { resolvedField =>
- originalFields.get(resolvedField.name) match {
- case Some(origField) =>
- resolvedField.copy(
- nullable = origField.nullable,
- dataType = restoreDataTypeNullability(resolvedField.dataType,
origField.dataType))
- case None => resolvedField
- }
- })
- }
-
- private def restoreDataTypeNullability(resolved: DataType, original:
DataType): DataType = {
- (resolved, original) match {
- case (r: StructType, o: StructType) => restoreNullability(r, o)
- case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
- ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
- case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
- MapType(
- restoreDataTypeNullability(rKey, oKey),
- restoreDataTypeNullability(rVal, oVal),
- oValNull)
- case _ => resolved
- }
- }
}
/**
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d2ec3f7ff486..be1f05da308f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -362,7 +362,10 @@ case class DataSource(
* is considered as a non-streaming file based data
source. Since we know
* that files already exist, we don't need to check
them again.
*/
- def resolveRelation(checkFilesExist: Boolean = true, readOnly: Boolean =
false): BaseRelation = {
+ def resolveRelation(
+ checkFilesExist: Boolean = true,
+ readOnly: Boolean = false,
+ forceNullable: Boolean = true): BaseRelation = {
val relation = (providingInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
case (dataSource: SchemaRelationProvider, Some(schema)) =>
@@ -436,7 +439,7 @@ case class DataSource(
HadoopFsRelation(
fileCatalog,
partitionSchema = partitionSchema,
- dataSchema = dataSchema.asNullable,
+ dataSchema = if (forceNullable) dataSchema.asNullable else
dataSchema,
bucketSpec = bucketSpec,
format,
caseInsensitiveOptions)(sparkSession)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
index 95b539e58ac6..e65bf1c72bb6 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
@@ -185,7 +185,7 @@ trait ShowCreateTableSuiteBase extends
command.ShowCreateTableSuiteBase
val showDDL = getShowCreateDDL(t)
assert(showDDL === Array(
s"CREATE TABLE $fullName (",
- "a BIGINT NOT NULL,",
+ "a BIGINT,",
"b BIGINT DEFAULT 42,",
"c STRING COLLATE UTF8_BINARY DEFAULT 'abc, \"def\"' COMMENT
'comment')",
"USING parquet",
diff --git
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
index c2a5ca1023e9..b96d00ee43e3 100644
---
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
+++
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
@@ -340,12 +340,10 @@ class SparkMetadataOperationSuite extends
HiveThriftServer2TestBase {
case _ => assert(radix === 0) // nulls
}
- val expectedNullable = if (schema(pos).nullable) 1 else 0
- assert(rowSet.getInt("NULLABLE") === expectedNullable)
+ assert(rowSet.getInt("NULLABLE") === 1)
assert(rowSet.getString("REMARKS") === pos.toString)
assert(rowSet.getInt("ORDINAL_POSITION") === pos + 1)
- val expectedIsNullable = if (schema(pos).nullable) "YES" else "NO"
- assert(rowSet.getString("IS_NULLABLE") === expectedIsNullable)
+ assert(rowSet.getString("IS_NULLABLE") === "YES")
assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO")
pos += 1
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]