This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a803c38c38d9 [SPARK-55716][SQL][FOLLOWUP] Simplify NOT NULL 
preservation by skipping asNullable in resolveRelation
a803c38c38d9 is described below

commit a803c38c38d91464fd2ac0b7cadaa1577be9502f
Author: Kent Yao <[email protected]>
AuthorDate: Wed Mar 4 15:25:49 2026 +0800

    [SPARK-55716][SQL][FOLLOWUP] Simplify NOT NULL preservation by skipping 
asNullable in resolveRelation
    
    ### What changes were proposed in this pull request?
    
    Followup to #54517. Simplifies NOT NULL constraint preservation per [review 
feedback](https://github.com/apache/spark/pull/54517#discussion_r2878532653).
    
    Instead of calling `dataSchema.asNullable` in `resolveRelation()` and then 
restoring nullability with recursive 
`restoreNullability`/`restoreDataTypeNullability` helpers in 
`CreateDataSourceTableCommand`, this PR:
    
    1. Adds a `forceNullable` parameter to `DataSource.resolveRelation()` 
(default `true`, preserving existing behavior)
    2. Passes `forceNullable = !enforceNotNull` from 
`CreateDataSourceTableCommand`, so `asNullable` is skipped only when the config 
is enabled
    3. Removes `restoreNullability` and `restoreDataTypeNullability` helper 
methods entirely
    
    **Data flow:**
    - **Config OFF** (default): `forceNullable = true` → `asNullable` runs → 
same behavior as before SPARK-55716
    - **Config ON**: `forceNullable = false` → `asNullable` skipped → catalog 
stores NOT NULL → `PreprocessTableInsertion` enforces at insert time
    
    ### Why are the changes needed?
    
    Addresses review feedback: "is it simpler to not do `dataSchema.asNullable` 
if the flag is on?"
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing SPARK-55716 tests (7 tests in InsertSuite) and 
ShowCreateTableSuite (30 tests) all pass.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes, co-authored with GitHub Copilot.
    
    Closes #54597 from yaooqinn/SPARK-55716.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../execution/command/createDataSourceTables.scala | 50 +++-------------------
 .../sql/execution/datasources/DataSource.scala     |  7 ++-
 .../command/v1/ShowCreateTableSuite.scala          |  2 +-
 .../thriftserver/SparkMetadataOperationSuite.scala |  6 +--
 4 files changed, 14 insertions(+), 51 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 2edac5b0179b..0415a33e2d6d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -28,8 +28,9 @@ import 
org.apache.spark.sql.classic.ClassicConversions.castToImpl
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.CommandExecutionMode
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.ArrayImplicits._
 
 /**
@@ -79,7 +80,9 @@ case class CreateDataSourceTableCommand(table: CatalogTable, 
ignoreIfExists: Boo
         bucketSpec = table.bucketSpec,
         options = table.storage.properties ++ pathOption,
         // As discussed in SPARK-19583, we don't check if the location is 
existed
-        catalogTable = 
Some(tableWithDefaultOptions)).resolveRelation(checkFilesExist = false)
+        catalogTable = Some(tableWithDefaultOptions))
+        .resolveRelation(checkFilesExist = false,
+          forceNullable = 
!sessionState.conf.getConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL))
 
     val partitionColumnNames = if (table.schema.nonEmpty) {
       table.partitionColumnNames
@@ -107,17 +110,8 @@ case class CreateDataSourceTableCommand(table: 
CatalogTable, ignoreIfExists: Boo
         table.copy(schema = new StructType(), partitionColumnNames = Nil)
 
       case _ =>
-        // Merge nullability from the user-specified schema into the resolved 
schema.
-        // DataSource.resolveRelation() calls dataSchema.asNullable which 
strips NOT NULL
-        // constraints. We restore nullability from the original user schema 
while keeping
-        // the resolved data types (which may include CharVarchar 
normalization, metadata, etc.)
-        val resolvedSchema = if (table.schema.nonEmpty) {
-          restoreNullability(dataSource.schema, table.schema)
-        } else {
-          dataSource.schema
-        }
         table.copy(
-          schema = resolvedSchema,
+          schema = dataSource.schema,
           partitionColumnNames = partitionColumnNames,
           // If metastore partition management for file source tables is 
enabled, we start off with
           // partition provider hive, but no partitions in the metastore. The 
user has to call
@@ -132,38 +126,6 @@ case class CreateDataSourceTableCommand(table: 
CatalogTable, ignoreIfExists: Boo
     Seq.empty[Row]
   }
 
-  /**
-   * Recursively restores nullability from the original user-specified schema 
into
-   * the resolved schema. The resolved schema's data types are preserved (they 
may
-   * contain CharVarchar normalization, metadata, etc.), but nullability flags
-   * (top-level and nested) are taken from the original schema.
-   */
-  private def restoreNullability(resolved: StructType, original: StructType): 
StructType = {
-    val originalFields = original.fields.map(f => f.name -> f).toMap
-    StructType(resolved.fields.map { resolvedField =>
-      originalFields.get(resolvedField.name) match {
-        case Some(origField) =>
-          resolvedField.copy(
-            nullable = origField.nullable,
-            dataType = restoreDataTypeNullability(resolvedField.dataType, 
origField.dataType))
-        case None => resolvedField
-      }
-    })
-  }
-
-  private def restoreDataTypeNullability(resolved: DataType, original: 
DataType): DataType = {
-    (resolved, original) match {
-      case (r: StructType, o: StructType) => restoreNullability(r, o)
-      case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
-        ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
-      case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
-        MapType(
-          restoreDataTypeNullability(rKey, oKey),
-          restoreDataTypeNullability(rVal, oVal),
-          oValNull)
-      case _ => resolved
-    }
-  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d2ec3f7ff486..be1f05da308f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -362,7 +362,10 @@ case class DataSource(
    *                        is considered as a non-streaming file based data 
source. Since we know
    *                        that files already exist, we don't need to check 
them again.
    */
-  def resolveRelation(checkFilesExist: Boolean = true, readOnly: Boolean = 
false): BaseRelation = {
+  def resolveRelation(
+      checkFilesExist: Boolean = true,
+      readOnly: Boolean = false,
+      forceNullable: Boolean = true): BaseRelation = {
     val relation = (providingInstance(), userSpecifiedSchema) match {
       // TODO: Throw when too much is given.
       case (dataSource: SchemaRelationProvider, Some(schema)) =>
@@ -436,7 +439,7 @@ case class DataSource(
         HadoopFsRelation(
           fileCatalog,
           partitionSchema = partitionSchema,
-          dataSchema = dataSchema.asNullable,
+          dataSchema = if (forceNullable) dataSchema.asNullable else 
dataSchema,
           bucketSpec = bucketSpec,
           format,
           caseInsensitiveOptions)(sparkSession)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
index 95b539e58ac6..e65bf1c72bb6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
@@ -185,7 +185,7 @@ trait ShowCreateTableSuiteBase extends 
command.ShowCreateTableSuiteBase
       val showDDL = getShowCreateDDL(t)
       assert(showDDL === Array(
         s"CREATE TABLE $fullName (",
-        "a BIGINT NOT NULL,",
+        "a BIGINT,",
         "b BIGINT DEFAULT 42,",
         "c STRING COLLATE UTF8_BINARY DEFAULT 'abc, \"def\"' COMMENT 
'comment')",
         "USING parquet",
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
index c2a5ca1023e9..b96d00ee43e3 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
@@ -340,12 +340,10 @@ class SparkMetadataOperationSuite extends 
HiveThriftServer2TestBase {
           case _ => assert(radix === 0) // nulls
         }
 
-        val expectedNullable = if (schema(pos).nullable) 1 else 0
-        assert(rowSet.getInt("NULLABLE") === expectedNullable)
+        assert(rowSet.getInt("NULLABLE") === 1)
         assert(rowSet.getString("REMARKS") === pos.toString)
         assert(rowSet.getInt("ORDINAL_POSITION") === pos + 1)
-        val expectedIsNullable = if (schema(pos).nullable) "YES" else "NO"
-        assert(rowSet.getString("IS_NULLABLE") === expectedIsNullable)
+        assert(rowSet.getString("IS_NULLABLE") === "YES")
         assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO")
         pos += 1
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to