This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fccedae3144b [SPARK-55716][SQL] Support NOT NULL constraint 
enforcement for V1 file source table inserts
fccedae3144b is described below

commit fccedae3144bcbf3b0f2ee36da72b89fe17a823d
Author: Kent Yao <[email protected]>
AuthorDate: Tue Mar 3 16:21:50 2026 +0800

    [SPARK-55716][SQL] Support NOT NULL constraint enforcement for V1 file 
source table inserts
    
    ### What changes were proposed in this pull request?
    
    V1 file-based DataSource writes (Parquet, ORC, JSON, etc.) silently accept 
null values into NOT NULL columns. This PR adds opt-in NOT NULL constraint 
enforcement controlled by `spark.sql.fileSource.insert.enforceNotNull`.
    
    **Changes:**
    
    1. **`CreateDataSourceTableCommand`**: Preserves user-specified nullability 
by recursively merging nullability flags from the user schema into the resolved 
`dataSource.schema`. Previously it stored `dataSource.schema` directly, which 
is all-nullable due to `DataSource.resolveRelation()` calling 
`dataSchema.asNullable` (SPARK-13738).
    
    2. **`PreprocessTableInsertion`**: Restores nullability flags from the 
catalog schema before null checks, ensuring `AssertNotNull` is injected when 
needed. Gated behind `spark.sql.fileSource.insert.enforceNotNull`.
    
    3. **New config**: `spark.sql.fileSource.insert.enforceNotNull` (default 
`false`) — when set to `true`, enables NOT NULL constraint enforcement for V1 
file-based tables, consistent with the behavior for other data sources and V2 
catalog tables.
    
    4. **`SparkGetColumnsOperation`**: Fixed `IS_NULLABLE` to respect 
`column.nullable` instead of always returning `"YES"`.
    
    ### Why are the changes needed?
    
    `DataSource.resolveRelation()` calls `dataSchema.asNullable` (added in 
SPARK-13738 for read safety), which strips all NOT NULL constraints 
recursively. `CreateDataSourceTableCommand` then stores this all-nullable 
schema in the catalog, permanently losing NOT NULL information. As a result, 
`PreprocessTableInsertion` never injects `AssertNotNull` for V1 file source 
tables.
    
    Note: `InsertableRelation` (e.g., `SimpleInsertSource`) does NOT have this 
problem because it preserves the original schema (SPARK-24583).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No change in default behavior. Users can opt in to NOT NULL enforcement for 
V1 file source tables by setting `spark.sql.fileSource.insert.enforceNotNull` 
to `true`.
    
    ### How was this patch tested?
    
    - Added 7 new tests in `InsertSuite` covering top-level, nested struct, 
array, and map null constraint enforcement.
    - Fixed 3 existing interval column test assertions in 
`SparkMetadataOperationSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Yes, co-authored with GitHub Copilot.
    
    Closes #54517 from yaooqinn/SPARK-55716.
    
    Lead-authored-by: Kent Yao <[email protected]>
    Co-authored-by: Kent Yao <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  10 ++
 .../execution/command/createDataSourceTables.scala |  46 +++++-
 .../spark/sql/execution/datasources/rules.scala    |  54 +++++-
 .../command/v1/ShowCreateTableSuite.scala          |   2 +-
 .../org/apache/spark/sql/sources/InsertSuite.scala | 182 +++++++++++++++++++++
 .../thriftserver/SparkGetColumnsOperation.scala    |   2 +-
 .../thriftserver/SparkMetadataOperationSuite.scala |  12 +-
 7 files changed, 297 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e4390c675479..0a0a448ecd2d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4777,6 +4777,16 @@ object SQLConf {
       .enumConf(StoreAssignmentPolicy)
       .createWithDefault(StoreAssignmentPolicy.ANSI)
 
+  val FILE_SOURCE_INSERT_ENFORCE_NOT_NULL =
+    buildConf("spark.sql.fileSource.insert.enforceNotNull")
+      .doc("When true, Spark enforces NOT NULL constraints when inserting data 
into " +
+        "file-based data source tables (e.g., Parquet, ORC, JSON), consistent 
with the " +
+        "behavior for other data sources and V2 catalog tables. " +
+        "When false (default), null values are silently accepted into NOT NULL 
columns.")
+      .version("4.2.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val ANSI_ENABLED = buildConf(SqlApiConfHelper.ANSI_ENABLED_KEY)
     .doc("When true, Spark SQL uses an ANSI compliant dialect instead of being 
Hive compliant. " +
       "For example, Spark will throw an exception at runtime instead of 
returning null results " +
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 5ef19b832f5b..2edac5b0179b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.CommandExecutionMode
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
 import org.apache.spark.util.ArrayImplicits._
 
 /**
@@ -107,8 +107,17 @@ case class CreateDataSourceTableCommand(table: 
CatalogTable, ignoreIfExists: Boo
         table.copy(schema = new StructType(), partitionColumnNames = Nil)
 
       case _ =>
+        // Merge nullability from the user-specified schema into the resolved 
schema.
+        // DataSource.resolveRelation() calls dataSchema.asNullable which 
strips NOT NULL
+        // constraints. We restore nullability from the original user schema 
while keeping
+        // the resolved data types (which may include CharVarchar 
normalization, metadata, etc.)
+        val resolvedSchema = if (table.schema.nonEmpty) {
+          restoreNullability(dataSource.schema, table.schema)
+        } else {
+          dataSource.schema
+        }
         table.copy(
-          schema = dataSource.schema,
+          schema = resolvedSchema,
           partitionColumnNames = partitionColumnNames,
           // If metastore partition management for file source tables is 
enabled, we start off with
           // partition provider hive, but no partitions in the metastore. The 
user has to call
@@ -122,6 +131,39 @@ case class CreateDataSourceTableCommand(table: 
CatalogTable, ignoreIfExists: Boo
 
     Seq.empty[Row]
   }
+
+  /**
+   * Recursively restores nullability from the original user-specified schema 
into
+   * the resolved schema. The resolved schema's data types are preserved (they 
may
+   * contain CharVarchar normalization, metadata, etc.), but nullability flags
+   * (top-level and nested) are taken from the original schema.
+   */
+  private def restoreNullability(resolved: StructType, original: StructType): 
StructType = {
+    val originalFields = original.fields.map(f => f.name -> f).toMap
+    StructType(resolved.fields.map { resolvedField =>
+      originalFields.get(resolvedField.name) match {
+        case Some(origField) =>
+          resolvedField.copy(
+            nullable = origField.nullable,
+            dataType = restoreDataTypeNullability(resolvedField.dataType, 
origField.dataType))
+        case None => resolvedField
+      }
+    })
+  }
+
+  private def restoreDataTypeNullability(resolved: DataType, original: 
DataType): DataType = {
+    (resolved, original) match {
+      case (r: StructType, o: StructType) => restoreNullability(r, o)
+      case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
+        ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
+      case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
+        MapType(
+          restoreDataTypeNullability(rKey, oKey),
+          restoreDataTypeNullability(rVal, oVal),
+          oValNull)
+      case _ => resolved
+    }
+  }
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index f097e1aa6379..acb113f81bac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -39,7 +39,7 @@ import 
org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
 import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.InsertableRelation
-import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.util.PartitioningUtils.normalizePartitionSpec
 import org.apache.spark.sql.util.SchemaUtils
 import org.apache.spark.util.ArrayImplicits._
@@ -470,7 +470,29 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
       insert.partitionSpec, partColNames, tblName, conf.resolver)
 
     val staticPartCols = normalizedPartSpec.filter(_._2.isDefined).keySet
-    val expectedColumns = insert.table.output.filterNot(a => 
staticPartCols.contains(a.name))
+    val expectedColumns = {
+      val cols = insert.table.output.filterNot(a => 
staticPartCols.contains(a.name))
+      // When the legacy config is disabled, restore the original nullability 
from the
+      // catalog table schema. HadoopFsRelation forces dataSchema.asNullable 
for safe reads,
+      // which strips NOT NULL constraints (both top-level and nested) from the
+      // LogicalRelation output. We restore nullability so that AssertNotNull 
checks are
+      // properly injected.
+      if (conf.getConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL)) {
+        catalogTable.map { ct =>
+          val catalogFields = ct.schema.fields.map(f => f.name -> f).toMap
+          cols.map { col =>
+            catalogFields.get(col.name) match {
+              case Some(field) =>
+                col.withNullability(field.nullable)
+                  .withDataType(restoreDataTypeNullability(col.dataType, 
field.dataType))
+              case None => col
+            }
+          }
+        }.getOrElse(cols)
+      } else {
+        cols
+      }
+    }
 
     val partitionsTrackedByCatalog = catalogTable.isDefined &&
       catalogTable.get.partitionColumnNames.nonEmpty &&
@@ -546,6 +568,34 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
         case _ => i
       }
   }
+
+  /**
+   * Recursively restores nullability flags from the original data type into 
the resolved
+   * data type, keeping the resolved type structure intact.
+   */
+  private def restoreDataTypeNullability(resolved: DataType, original: 
DataType): DataType = {
+    (resolved, original) match {
+      case (r: StructType, o: StructType) =>
+        val origFields = o.fields.map(f => f.name -> f).toMap
+        StructType(r.fields.map { rf =>
+          origFields.get(rf.name) match {
+            case Some(of) =>
+              rf.copy(
+                nullable = of.nullable,
+                dataType = restoreDataTypeNullability(rf.dataType, 
of.dataType))
+            case None => rf
+          }
+        })
+      case (ArrayType(rElem, _), ArrayType(oElem, oNull)) =>
+        ArrayType(restoreDataTypeNullability(rElem, oElem), oNull)
+      case (MapType(rKey, rVal, _), MapType(oKey, oVal, oValNull)) =>
+        MapType(
+          restoreDataTypeNullability(rKey, oKey),
+          restoreDataTypeNullability(rVal, oVal),
+          oValNull)
+      case _ => resolved
+    }
+  }
 }
 
 /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
index e65bf1c72bb6..95b539e58ac6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowCreateTableSuite.scala
@@ -185,7 +185,7 @@ trait ShowCreateTableSuiteBase extends 
command.ShowCreateTableSuiteBase
       val showDDL = getShowCreateDDL(t)
       assert(showDDL === Array(
         s"CREATE TABLE $fullName (",
-        "a BIGINT,",
+        "a BIGINT NOT NULL,",
         "b BIGINT DEFAULT 42,",
         "c STRING COLLATE UTF8_BINARY DEFAULT 'abc, \"def\"' COMMENT 
'comment')",
         "USING parquet",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 4cc8db226b43..ea5651238f45 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -2853,6 +2853,188 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
+  test("SPARK-55716: V1 INSERT rejects null into NOT NULL column for file 
sources") {
+    Seq("parquet", "orc", "json").foreach { format =>
+      withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") {
+        withTable("t") {
+          sql(s"CREATE TABLE t(i INT NOT NULL, s STRING NOT NULL) USING 
$format")
+          // V1 DataSource writes now enforce NOT NULL constraints via 
AssertNotNull
+          val e1 = intercept[SparkRuntimeException] {
+            sql("INSERT INTO t VALUES(null, 'a')")
+          }
+          assert(e1.getCondition === "NOT_NULL_ASSERT_VIOLATION")
+          val e2 = intercept[SparkRuntimeException] {
+            sql("INSERT INTO t VALUES(1, null)")
+          }
+          assert(e2.getCondition === "NOT_NULL_ASSERT_VIOLATION")
+          // Valid insert should succeed
+          sql("INSERT INTO t VALUES(1, 'a')")
+          checkAnswer(spark.table("t"), Seq(Row(1, "a")))
+        }
+      }
+    }
+  }
+
+  test("SPARK-55716: V1 INSERT NOT NULL enforcement respects 
storeAssignmentPolicy") {
+    Seq("parquet", "orc").foreach { format =>
+      // ANSI mode (default): rejects null
+      withSQLConf(
+        SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true",
+        SQLConf.STORE_ASSIGNMENT_POLICY.key -> 
SQLConf.StoreAssignmentPolicy.ANSI.toString) {
+        withTable("t") {
+          sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
+          val e = intercept[SparkRuntimeException] {
+            sql("INSERT INTO t VALUES(null)")
+          }
+          assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
+        }
+      }
+      // STRICT mode: also rejects null (fails at analysis with type mismatch)
+      withSQLConf(
+        SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true",
+        SQLConf.STORE_ASSIGNMENT_POLICY.key -> 
SQLConf.StoreAssignmentPolicy.STRICT.toString) {
+        withTable("t") {
+          sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
+          intercept[AnalysisException] {
+            sql("INSERT INTO t VALUES(null)")
+          }
+        }
+      }
+      // LEGACY mode: allows null (no AssertNotNull injected)
+      withSQLConf(
+        SQLConf.STORE_ASSIGNMENT_POLICY.key -> 
SQLConf.StoreAssignmentPolicy.LEGACY.toString) {
+        withTable("t") {
+          sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
+          sql("INSERT INTO t VALUES(null)")
+          checkAnswer(spark.table("t"), Seq(Row(null)))
+        }
+      }
+      // Legacy config: allows null even in ANSI mode
+      withSQLConf(
+        SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "false") {
+        withTable("t") {
+          sql(s"CREATE TABLE t(i INT NOT NULL) USING $format")
+          sql("INSERT INTO t VALUES(null)")
+          checkAnswer(spark.table("t"), Seq(Row(null)))
+        }
+      }
+    }
+  }
+
+  test("SPARK-55716: V1 INSERT rejects null with V2 file source path") {
+    Seq("parquet", "orc").foreach { format =>
+      withSQLConf(
+        SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true",
+        SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+        withTable("t") {
+          sql(s"CREATE TABLE t(i INT NOT NULL, s STRING NOT NULL) USING 
$format")
+          val e = intercept[SparkRuntimeException] {
+            sql("INSERT INTO t VALUES(null, 'a')")
+          }
+          assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
+        }
+      }
+    }
+  }
+
+  test("SPARK-55716: V1 INSERT rejects null array element for NOT NULL element 
type") {
+    Seq("parquet", "orc").foreach { format =>
+      withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") {
+      withTable("t") {
+        val schema = new StructType()
+          .add("a", ArrayType(IntegerType, containsNull = false))
+        spark.sessionState.catalog.createTable(
+          CatalogTable(
+            identifier = TableIdentifier("t"),
+            tableType = CatalogTableType.MANAGED,
+            storage = CatalogStorageFormat.empty,
+            schema = schema,
+            provider = Some(format)),
+          ignoreIfExists = false)
+        val e = intercept[SparkRuntimeException] {
+          sql("INSERT INTO t SELECT array(1, null, 3)")
+        }
+        assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
+        // Valid insert should succeed
+        sql("INSERT INTO t SELECT array(1, 2, 3)")
+        checkAnswer(spark.table("t"), Seq(Row(Seq(1, 2, 3))))
+      }
+      }
+    }
+  }
+
+  test("SPARK-55716: V1 INSERT rejects null struct field for NOT NULL field") {
+    Seq("parquet", "orc").foreach { format =>
+      withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") {
+      withTable("t") {
+        val schema = new StructType()
+          .add("s", new StructType()
+            .add("x", IntegerType, nullable = false)
+            .add("y", StringType, nullable = false))
+        spark.sessionState.catalog.createTable(
+          CatalogTable(
+            identifier = TableIdentifier("t"),
+            tableType = CatalogTableType.MANAGED,
+            storage = CatalogStorageFormat.empty,
+            schema = schema,
+            provider = Some(format)),
+          ignoreIfExists = false)
+        val e = intercept[SparkRuntimeException] {
+          sql("INSERT INTO t SELECT named_struct('x', null, 'y', 'hello')")
+        }
+        assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
+        // Valid insert should succeed
+        sql("INSERT INTO t SELECT named_struct('x', 1, 'y', 'hello')")
+        checkAnswer(spark.table("t"), Seq(Row(Row(1, "hello"))))
+      }
+      }
+    }
+  }
+
+  test("SPARK-55716: V1 INSERT rejects null map value for NOT NULL value 
type") {
+    Seq("parquet", "orc").foreach { format =>
+      withSQLConf(SQLConf.FILE_SOURCE_INSERT_ENFORCE_NOT_NULL.key -> "true") {
+      withTable("t") {
+        val schema = new StructType()
+          .add("m", MapType(StringType, IntegerType, valueContainsNull = 
false))
+        spark.sessionState.catalog.createTable(
+          CatalogTable(
+            identifier = TableIdentifier("t"),
+            tableType = CatalogTableType.MANAGED,
+            storage = CatalogStorageFormat.empty,
+            schema = schema,
+            provider = Some(format)),
+          ignoreIfExists = false)
+        val e = intercept[SparkRuntimeException] {
+          sql("INSERT INTO t SELECT map('a', 1, 'b', null)")
+        }
+        assert(e.getCondition === "NOT_NULL_ASSERT_VIOLATION")
+        // Valid insert should succeed
+        sql("INSERT INTO t SELECT map('a', 1, 'b', 2)")
+        checkAnswer(spark.table("t"), Seq(Row(Map("a" -> 1, "b" -> 2))))
+      }
+      }
+    }
+  }
+
+  test("SPARK-55716: V1 DataFrame write ignores NOT NULL schema constraint") {
+    Seq("parquet", "orc").foreach { format =>
+      withTempPath { path =>
+        val data = Seq(Row(null, "hello", 1.0), Row(1, null, 2.0), Row(2, 
"world", null))
+        val df = spark.createDataFrame(
+          spark.sparkContext.parallelize(data),
+          new StructType()
+            .add("id", IntegerType, nullable = true)
+            .add("name", StringType, nullable = true)
+            .add("value", DoubleType, nullable = true))
+        // V1 DataSource writes do not enforce NOT NULL constraints
+        
df.write.mode(SaveMode.Overwrite).format(format).save(path.getCanonicalPath)
+        val result = spark.read.format(format).load(path.getCanonicalPath)
+        checkAnswer(result, data)
+      }
+    }
+  }
+
   test("UNSUPPORTED_OVERWRITE.PATH: Can't overwrite a path that is also being 
read from") {
     val tableName = "t1"
     withTable(tableName) {
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
index d3eec329efc1..01b94c1e9cba 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
@@ -224,7 +224,7 @@ private[hive] class SparkGetColumnsOperation(
           null, // SQL_DATETIME_SUB
           null, // CHAR_OCTET_LENGTH
           ordinal.asInstanceOf[AnyRef], // ORDINAL_POSITION, 1-based
-          "YES", // IS_NULLABLE
+          (if (column.nullable) "YES" else "NO"), // IS_NULLABLE
           null, // SCOPE_CATALOG
           null, // SCOPE_SCHEMA
           null, // SCOPE_TABLE
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
index a10d2974db74..c2a5ca1023e9 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala
@@ -340,10 +340,12 @@ class SparkMetadataOperationSuite extends 
HiveThriftServer2TestBase {
           case _ => assert(radix === 0) // nulls
         }
 
-        assert(rowSet.getInt("NULLABLE") === 1)
+        val expectedNullable = if (schema(pos).nullable) 1 else 0
+        assert(rowSet.getInt("NULLABLE") === expectedNullable)
         assert(rowSet.getString("REMARKS") === pos.toString)
         assert(rowSet.getInt("ORDINAL_POSITION") === pos + 1)
-        assert(rowSet.getString("IS_NULLABLE") === "YES")
+        val expectedIsNullable = if (schema(pos).nullable) "YES" else "NO"
+        assert(rowSet.getString("IS_NULLABLE") === expectedIsNullable)
         assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO")
         pos += 1
       }
@@ -374,7 +376,7 @@ class SparkMetadataOperationSuite extends 
HiveThriftServer2TestBase {
         assert(rowSet.getInt("NULLABLE") === 0)
         assert(rowSet.getString("REMARKS") === "")
         assert(rowSet.getInt("ORDINAL_POSITION") === 1)
-        assert(rowSet.getString("IS_NULLABLE") === "YES")
+        assert(rowSet.getString("IS_NULLABLE") === "NO")
         assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO")
       }
     }
@@ -402,7 +404,7 @@ class SparkMetadataOperationSuite extends 
HiveThriftServer2TestBase {
         assert(rowSet.getInt("NULLABLE") === 0)
         assert(rowSet.getString("REMARKS") === "")
         assert(rowSet.getInt("ORDINAL_POSITION") === 1)
-        assert(rowSet.getString("IS_NULLABLE") === "YES")
+        assert(rowSet.getString("IS_NULLABLE") === "NO")
         assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO")
       }
     }
@@ -428,7 +430,7 @@ class SparkMetadataOperationSuite extends 
HiveThriftServer2TestBase {
         assert(rowSet.getInt("NULLABLE") === 0)
         assert(rowSet.getString("REMARKS") === "")
         assert(rowSet.getInt("ORDINAL_POSITION") === 1)
-        assert(rowSet.getString("IS_NULLABLE") === "YES")
+        assert(rowSet.getString("IS_NULLABLE") === "NO")
         assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO")
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to