This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c04e095779f4 [SPARK-54220][SQL] NullType/VOID/UNKNOWN Type Support in 
Parquet
c04e095779f4 is described below

commit c04e095779f4c9d02152d1db3a510ee21905688f
Author: Ziya Mukhtarov <[email protected]>
AuthorDate: Fri Nov 7 10:29:52 2025 -0800

    [SPARK-54220][SQL] NullType/VOID/UNKNOWN Type Support in Parquet
    
    ### What changes were proposed in this pull request?
    
    This PR adds support for reading/writing NullType columns in Parquet files 
via the `UNKNOWN` logical type annotation. Notable changes are:
    - Changing `ParquetFileFormat.supportDataType` to support NullType
    - Changing `ParquetToSparkSchemaConverter` to infer NullType if a primitive 
type has `UNKNOWN` type annotation and there's no Spark-provided expected type
    - Changing `SparkToParquetSchemaConverter` to convert NullType into a 
Parquet Boolean physical type with `UNKNOWN` annotation (physical type 
selection here is arbitrary)
    - Optimization in `On/OffHeapColumnVector` to not allocate memory if whole 
vector is guaranteed to hold only nulls, which is the case for NullType columns 
and columns that are missing from the file.
    
    ### Why are the changes needed?
    
    To support reading/writing NullType columns in Parquet files.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Previously, trying to read or write Parquet files where the schema 
contained NullType column would throw an error. Now, we no longer throw, and 
instead write/read data as expected, using the `UNKNOWN` type annotation.
    
    ### How was this patch tested?
    
    Manually verified that the Parquet files we write is readable by Apache 
Arrow. Also verified that we can read a simple Parquet file with NullType 
written by Apache Arrow. Added a new unit test. Fixed existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52922 from ZiyaZa/parquet-unknown.
    
    Authored-by: Ziya Mukhtarov <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 10 ++++
 .../datasources/parquet/ParquetColumnVector.java   |  6 +--
 .../parquet/ParquetVectorUpdaterFactory.java       | 44 ++++++++++++++++-
 .../execution/vectorized/OffHeapColumnVector.java  |  6 ++-
 .../execution/vectorized/OnHeapColumnVector.java   |  6 ++-
 .../execution/vectorized/WritableColumnVector.java | 25 ++++++----
 .../datasources/parquet/ParquetFileFormat.scala    |  2 +-
 .../datasources/parquet/ParquetReadSupport.scala   |  4 +-
 .../datasources/parquet/ParquetRowConverter.scala  |  7 +++
 .../parquet/ParquetSchemaConverter.scala           |  8 ++-
 .../datasources/parquet/ParquetUtils.scala         |  4 +-
 .../datasources/parquet/ParquetWriteSupport.scala  |  5 +-
 .../spark/sql/FileBasedDataSourceSuite.scala       |  6 +--
 .../datasources/parquet/ParquetIOSuite.scala       | 57 ++++++++++++++++++++++
 .../datasources/parquet/ParquetSchemaSuite.scala   | 34 +++++++++++++
 .../spark/sql/hive/execution/HiveDDLSuite.scala    | 23 ++-------
 16 files changed, 205 insertions(+), 42 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 64cb365e5822..010003f37c52 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1552,6 +1552,13 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val PARQUET_VECTORIZED_READER_NULL_TYPE_ENABLED =
+    buildConf("spark.sql.parquet.enableNullTypeVectorizedReader")
+      .doc("Enables vectorized Parquet reader support for NullType columns.")
+      .version("4.1.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val PARQUET_RECORD_FILTER_ENABLED = 
buildConf("spark.sql.parquet.recordLevelFilter.enabled")
     .doc("If true, enables Parquet's native record-level filtering using the 
pushed down " +
       "filters. " +
@@ -6915,6 +6922,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
   def parquetVectorizedReaderNestedColumnEnabled: Boolean =
     getConf(PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED)
 
+  def parquetVectorizedReaderNullTypeEnabled: Boolean =
+    getConf(PARQUET_VECTORIZED_READER_NULL_TYPE_ENABLED)
+
   def parquetVectorizedReaderBatchSize: Int = 
getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE)
 
   def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
index 37c936c84d5f..002b7569a6e0 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
@@ -80,7 +80,7 @@ final class ParquetColumnVector {
       }
 
       if (defaultValue == null) {
-        vector.setAllNull();
+        vector.setMissing();
         return;
       }
       // For Parquet tables whose columns have associated DEFAULT values, this 
reader must return
@@ -137,7 +137,7 @@ final class ParquetColumnVector {
 
         // Only use levels from non-missing child, this can happen if only 
some but not all
         // fields of a struct are missing.
-        if (!childCv.vector.isAllNull()) {
+        if (!childCv.vector.isMissing()) {
           allChildrenAreMissing = false;
           this.repetitionLevels = childCv.repetitionLevels;
           this.definitionLevels = childCv.definitionLevels;
@@ -147,7 +147,7 @@ final class ParquetColumnVector {
       // This can happen if all the fields of a struct are missing, in which 
case we should mark
       // the struct itself as a missing column
       if (allChildrenAreMissing) {
-        vector.setAllNull();
+        vector.setMissing();
       }
     }
   }
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index eb6c84b8113b..4f90f878da86 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -26,6 +26,7 @@ import 
org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
 import 
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.UnknownLogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.SparkUnsupportedOperationException;
@@ -70,7 +71,12 @@ public class ParquetVectorUpdaterFactory {
   }
 
   public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType 
sparkType) {
-    PrimitiveType.PrimitiveTypeName typeName = 
descriptor.getPrimitiveType().getPrimitiveTypeName();
+    PrimitiveType type = descriptor.getPrimitiveType();
+    PrimitiveType.PrimitiveTypeName typeName = type.getPrimitiveTypeName();
+    boolean isUnknownType = type.getLogicalTypeAnnotation() instanceof 
UnknownLogicalTypeAnnotation;
+    if (isUnknownType && sparkType instanceof NullType) {
+      return new NullTypeUpdater();
+    }
 
     switch (typeName) {
       case BOOLEAN -> {
@@ -244,6 +250,42 @@ public class ParquetVectorUpdaterFactory {
       !annotation.isSigned() && annotation.getBitWidth() == bitWidth;
   }
 
+  /**
+   * Updater should not be called if all values are nulls, so all methods 
throw exception here.
+   */
+  private static class NullTypeUpdater implements ParquetVectorUpdater {
+    @Override
+    public void readValues(
+        int total,
+        int offset,
+        WritableColumnVector values,
+        VectorizedValuesReader valuesReader) {
+      throw SparkUnsupportedOperationException.apply();
+    }
+
+    @Override
+    public void skipValues(int total, VectorizedValuesReader valuesReader) {
+      throw SparkUnsupportedOperationException.apply();
+    }
+
+    @Override
+    public void readValue(
+        int offset,
+        WritableColumnVector values,
+        VectorizedValuesReader valuesReader) {
+      throw SparkUnsupportedOperationException.apply();
+    }
+
+    @Override
+    public void decodeSingleDictionaryId(
+        int offset,
+        WritableColumnVector values,
+        WritableColumnVector dictionaryIds,
+        Dictionary dictionary) {
+      throw SparkUnsupportedOperationException.apply();
+    }
+  }
+
   private static class BooleanUpdater implements ParquetVectorUpdater {
     @Override
     public void readValues(
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 2f64ffb42aa0..42454b283d09 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -111,12 +111,14 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putNull(int rowId) {
+    if (isAllNull()) return; // Skip writing nulls to all-null vector.
     Platform.putByte(null, nulls + rowId, (byte) 1);
     ++numNulls;
   }
 
   @Override
   public void putNulls(int rowId, int count) {
+    if (isAllNull()) return; // Skip writing nulls to all-null vector.
     long offset = nulls + rowId;
     for (int i = 0; i < count; ++i, ++offset) {
       Platform.putByte(null, offset, (byte) 1);
@@ -135,7 +137,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public boolean isNullAt(int rowId) {
-    return isAllNull || Platform.getByte(null, nulls + rowId) == 1;
+    return isAllNull() || Platform.getByte(null, nulls + rowId) == 1;
   }
 
   //
@@ -603,6 +605,8 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   // Split out the slow path.
   @Override
   protected void reserveInternal(int newCapacity) {
+    if (isAllNull()) return; // Skip allocation for all-null vector.
+
     int oldCapacity = (nulls == 0L) ? 0 : capacity;
     if (isArray() || type instanceof MapType) {
       this.lengthData =
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index cd8d0b688bed..401e499fee30 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -108,12 +108,14 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public void putNull(int rowId) {
+    if (isAllNull()) return; // Skip writing nulls to all-null vector.
     nulls[rowId] = (byte)1;
     ++numNulls;
   }
 
   @Override
   public void putNulls(int rowId, int count) {
+    if (isAllNull()) return; // Skip writing nulls to all-null vector.
     for (int i = 0; i < count; ++i) {
       nulls[rowId + i] = (byte)1;
     }
@@ -130,7 +132,7 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
 
   @Override
   public boolean isNullAt(int rowId) {
-    return isAllNull || nulls[rowId] == 1;
+    return isAllNull() || nulls[rowId] == 1;
   }
 
   //
@@ -577,6 +579,8 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   // Spilt this function out since it is the slow path.
   @Override
   protected void reserveInternal(int newCapacity) {
+    if (isAllNull()) return; // Skip allocation for all-null vector.
+
     if (isArray() || type instanceof MapType) {
       int[] newLengths = new int[newCapacity];
       int[] newOffsets = new int[newCapacity];
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 3f552679bb6f..c4f06e07911d 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -59,7 +59,7 @@ public abstract class WritableColumnVector extends 
ColumnVector {
    * Resets this column for writing. The currently stored values are no longer 
accessible.
    */
   public void reset() {
-    if (isConstant || isAllNull) return;
+    if (isConstant || isAllNull()) return;
 
     if (childColumns != null) {
       for (WritableColumnVector c: childColumns) {
@@ -142,7 +142,7 @@ public abstract class WritableColumnVector extends 
ColumnVector {
 
   @Override
   public boolean hasNull() {
-    return isAllNull || numNulls > 0;
+    return isAllNull() || numNulls > 0;
   }
 
   @Override
@@ -876,17 +876,24 @@ public abstract class WritableColumnVector extends 
ColumnVector {
   }
 
   /**
-   * Marks this column only contains null values.
+   * Marks this column missing from the file.
    */
-  public final void setAllNull() {
-    isAllNull = true;
+  public final void setMissing() {
+    isMissing = true;
+  }
+
+  /**
+   * Whether this column is missing from the file.
+   */
+  public final boolean isMissing() {
+    return isMissing;
   }
 
   /**
    * Whether this column only contains null values.
    */
   public final boolean isAllNull() {
-    return isAllNull;
+    return isMissing || type instanceof NullType;
   }
 
   /**
@@ -921,10 +928,10 @@ public abstract class WritableColumnVector extends 
ColumnVector {
   protected boolean isConstant;
 
   /**
-   * True if this column only contains nulls. This means the column values 
never change, even
-   * across resets. Comparing to 'isConstant' above, this doesn't require any 
allocation of space.
+   * True if this column is missing from the file. This means the column 
values never change and are
+   * nulls, even across resets. This doesn't require any allocation of space.
    */
-  protected boolean isAllNull;
+  protected boolean isMissing;
 
   /**
    * Default size of each array length value. This grows as necessary.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 4cc3fe61d22b..08e545cb8c20 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -408,7 +408,7 @@ class ParquetFileFormat
   }
 
   override def supportDataType(dataType: DataType): Boolean = dataType match {
-    case _: AtomicType => true
+    case _: AtomicType | _: NullType => true
 
     case st: StructType => st.forall { f => supportDataType(f.dataType) }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 25efd326f23a..7ee5b4d224b3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.api.{InitContext, 
ReadSupport}
 import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
 import org.apache.parquet.io.api.RecordMaterializer
 import org.apache.parquet.schema._
-import 
org.apache.parquet.schema.LogicalTypeAnnotation.{ListLogicalTypeAnnotation, 
MapKeyValueTypeAnnotation, MapLogicalTypeAnnotation}
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.{ListLogicalTypeAnnotation, 
MapKeyValueTypeAnnotation, MapLogicalTypeAnnotation, 
UnknownLogicalTypeAnnotation}
 import org.apache.parquet.schema.Type.Repetition
 
 import org.apache.spark.internal.Logging
@@ -562,6 +562,8 @@ object ParquetReadSupport extends Logging {
           }
         case primitiveType: PrimitiveType =>
           val cost = primitiveType.getPrimitiveTypeName match {
+            case _ if primitiveType.getLogicalTypeAnnotation
+              .isInstanceOf[UnknownLogicalTypeAnnotation] => 0 // NullType is 
always preferred
             case PrimitiveType.PrimitiveTypeName.BOOLEAN => 1
             case PrimitiveType.PrimitiveTypeName.INT32 => 4
             case PrimitiveType.PrimitiveTypeName.INT64 => 8
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index f9d50bf28ea8..d708a19dd1ac 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -315,6 +315,13 @@ private[parquet] class ParquetRowConverter(
     }
 
     catalystType match {
+      case NullType
+        if 
parquetType.getLogicalTypeAnnotation.isInstanceOf[UnknownLogicalTypeAnnotation] 
=>
+        val parentUpdater = updater
+        // A converter that throws upon any add... call, as we don't expect 
any value for NullType.
+        new PrimitiveConverter with HasParentContainerUpdater {
+          override def updater: ParentContainerUpdater = parentUpdater
+        }
       case LongType if isUnsignedIntTypeMatched(32) =>
         new ParquetPrimitiveConverter(updater) {
           override def addInt(value: Int): Unit =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 947c021c1bd3..76e24eb03f38 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -246,7 +246,9 @@ class ParquetToSparkSchemaConverter(
       DecimalType(precision, scale)
     }
 
-    val sparkType = sparkReadType.getOrElse(typeName match {
+    val isUnknownType = 
typeAnnotation.isInstanceOf[UnknownLogicalTypeAnnotation]
+    val nullTypeOpt = Option.when(isUnknownType)(NullType)
+    val sparkType = sparkReadType.orElse(nullTypeOpt).getOrElse(typeName match 
{
       case BOOLEAN => BooleanType
 
       case FLOAT => FloatType
@@ -836,6 +838,10 @@ class SparkToParquetSchemaConverter(
       case udt: UserDefinedType[_] =>
         convertField(field.copy(dataType = udt.sqlType), inShredded)
 
+      case NullType => // Selected primitive type here doesn't have 
significance.
+        Types.primitive(BOOLEAN, repetition).named(field.name)
+          .withLogicalTypeAnnotation(LogicalTypeAnnotation.unknownType())
+
       case _ =>
         throw 
QueryCompilationErrors.cannotConvertDataTypeToParquetTypeError(field)
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index 65a77322549f..80a32711c50a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -45,7 +45,7 @@ import 
org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, Outpu
 import org.apache.spark.sql.execution.datasources.v2.V2ColumnUtils
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.internal.SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED
-import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, MapType, 
StructField, StructType, UserDefinedType, VariantType}
+import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, MapType, 
NullType, StructField, StructType, UserDefinedType, VariantType}
 import org.apache.spark.util.ArrayImplicits._
 
 object ParquetUtils extends Logging {
@@ -209,6 +209,8 @@ object ParquetUtils extends Logging {
   def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean = dt match 
{
     case _: AtomicType =>
       true
+    case _: NullType =>
+      sqlConf.parquetVectorizedReaderNullTypeEnabled
     case at: ArrayType =>
       sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
         isBatchReadSupported(sqlConf, at.elementType)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 2ab9fb64da43..dcaf88fa8dfd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.api.WriteSupport
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
 import org.apache.parquet.io.api.{Binary, RecordConsumer}
 
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, 
SparkUnsupportedOperationException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SPARK_LEGACY_DATETIME_METADATA_KEY, 
SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, 
SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -192,6 +192,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow] 
with Logging {
   // schema. This affects how timestamp values are written.
   private def makeWriter(dataType: DataType, inShredded: Boolean): ValueWriter 
= {
     dataType match {
+      case NullType => // No values of NullType should ever be written, as all 
values are null.
+        (_: SpecializedGetters, _: Int) => throw 
SparkUnsupportedOperationException()
+
       case BooleanType =>
         (row: SpecializedGetters, ordinal: Int) =>
           recordConsumer.addBoolean(row.getBoolean(ordinal))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 35bbc6c8a1f4..95e86fe43119 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -565,10 +565,10 @@ class FileBasedDataSourceSuite extends QueryTest
     }
   }
 
-  test("SPARK-24204 error handling for unsupported Null data types - csv, 
parquet, orc") {
+  test("SPARK-24204 error handling for unsupported Null data types - csv, 
orc") {
     Seq(true, false).foreach { useV1 =>
       val useV1List = if (useV1) {
-        "csv,orc,parquet"
+        "csv,orc"
       } else {
         ""
       }
@@ -576,7 +576,7 @@ class FileBasedDataSourceSuite extends QueryTest
         withTempDir { dir =>
           val tempDir = new File(dir, "files").getCanonicalPath
 
-          Seq("parquet", "csv", "orc").foreach { format =>
+          Seq("csv", "orc").foreach { format =>
             // write path
             checkError(
               exception = intercept[AnalysisException] {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index c237f5cc42fb..3072657a0954 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -296,6 +296,31 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
     }
   }
 
+  test("SPARK-54220: NullType") {
+    val data = (1 to 5)
+      .map(_ => Row(null, Row(null, null), Seq(null, null), Map(Row(null) -> 
null))).asJava
+    val dataSchema = new StructType()
+      .add("_1", NullType)
+      .add("_2", new StructType()
+        .add("_1", NullType)
+        .add("_2", NullType))
+      .add("_3", ArrayType(NullType, containsNull = true))
+      .add("_4", MapType(new StructType().add("_1", NullType), NullType, 
valueContainsNull = true))
+    val expected = data.asScala.toArray
+
+    withAllParquetWriters {
+      withTempPath { path =>
+        val file = path.getCanonicalPath
+        spark.createDataFrame(data, dataSchema).write.parquet(file)
+
+        withAllParquetReaders {
+          val df = spark.read.parquet(file)
+          checkAnswer(df, expected)
+        }
+      }
+    }
+  }
+
   testStandardAndLegacyModes("map") {
     val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
     checkParquetFile(data)
@@ -928,6 +953,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
     }
   }
 
+  test("SPARK-54220: vectorized reader: missing all struct fields, struct with 
NullType only") {
+    val data = Seq(
+      Tuple1((null, null)),
+      Tuple1((null, null)),
+      Tuple1(null)
+    )
+    val readSchema = new StructType().add("_1",
+      new StructType()
+          .add("_3", IntegerType, nullable = true)
+          .add("_4", StringType, nullable = true),
+      nullable = true)
+    val expectedAnswer = Row(Row(null, null)) :: Row(Row(null, null)) :: 
Row(null) :: Nil
+
+    withParquetFile(data) { file =>
+      for (offheapEnabled <- Seq(true, false)) {
+        withSQLConf(
+            SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> 
"true",
+            
SQLConf.LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING.key -> "false",
+            SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> 
offheapEnabled.toString) {
+          withAllParquetReaders {
+            val df = spark.read.schema(readSchema).parquet(file)
+            val scanNode = df.queryExecution.executedPlan.collectLeaves().head
+            if (scanNode.supportsColumnar) {
+              VerifyNoAdditionalScanOutputExec(scanNode).execute().collect()
+            }
+            checkAnswer(df, expectedAnswer)
+          }
+        }
+      }
+    }
+  }
+
   test("vectorized reader: missing some struct fields") {
     Seq(true, false).foreach { offheapEnabled =>
       withSQLConf(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index d256752c287c..56076175d60e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -3370,6 +3370,40 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
            |}
          """).stripMargin,
       returnNullStructIfAllFieldsMissing = returnNullStructIfAllFieldsMissing)
+
+    testSchemaClipping(
+      s"SPARK-54220: missing struct with NullType, " +
+        
s"returnNullStructIfAllFieldsMissing=$returnNullStructIfAllFieldsMissing",
+      parquetSchema =
+        """message root {
+          |  optional group _1 {
+          |    optional int32 _1;
+          |    optional boolean _2;
+          |    optional group _3 {
+          |      optional int64 _1 (UNKNOWN);
+          |    }
+          |  }
+          |}
+        """.stripMargin,
+      catalystSchema = new StructType()
+        .add("_1", new StructType()
+          .add("_101", IntegerType)
+          .add("_102", LongType)),
+      expectedSchema =
+        ("""message root {
+           |  optional group _1 {
+           |    optional int32 _101;
+           |    optional int64 _102;""" + (if 
(!returnNullStructIfAllFieldsMissing) {
+         """
+           |    optional group _3 {
+           |      optional int64 _1 (UNKNOWN);
+           |    }
+           |  }""" } else { "" }) +
+         """
+           |  }
+           |}
+         """).stripMargin,
+      returnNullStructIfAllFieldsMissing = returnNullStructIfAllFieldsMissing)
   }
 
   testSchemaClipping(
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index caa4ca4581b4..baafdc1ea50a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -2588,16 +2588,8 @@ class HiveDDLSuite
   test("SPARK-36241: support creating tables with void datatype") {
     // CTAS with void type
     withTable("t1", "t2", "t3") {
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("CREATE TABLE t1 USING PARQUET AS SELECT NULL AS null_col")
-        },
-        condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
-        parameters = Map(
-          "columnName" -> "`null_col`",
-          "columnType" -> "\"VOID\"",
-          "format" -> "Parquet")
-      )
+      sql("CREATE TABLE t1 USING PARQUET AS SELECT NULL AS null_col")
+      checkAnswer(sql("SELECT * FROM t1"), Row(null))
 
       checkError(
         exception = intercept[AnalysisException] {
@@ -2615,15 +2607,8 @@ class HiveDDLSuite
 
     // Create table with void type
     withTable("t1", "t2", "t3", "t4") {
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("CREATE TABLE t1 (v VOID) USING PARQUET")
-        },
-        condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
-        parameters = Map(
-          "columnName" -> "`v`",
-          "columnType" -> "\"VOID\"",
-          "format" -> "Parquet"))
+      sql("CREATE TABLE t1 (v VOID) USING PARQUET")
+      checkAnswer(sql("SELECT * FROM t1"), Seq.empty)
 
       checkError(
         exception = intercept[AnalysisException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to