This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c04e095779f4 [SPARK-54220][SQL] NullType/VOID/UNKNOWN Type Support in
Parquet
c04e095779f4 is described below
commit c04e095779f4c9d02152d1db3a510ee21905688f
Author: Ziya Mukhtarov <[email protected]>
AuthorDate: Fri Nov 7 10:29:52 2025 -0800
[SPARK-54220][SQL] NullType/VOID/UNKNOWN Type Support in Parquet
### What changes were proposed in this pull request?
This PR adds support for reading/writing NullType columns in Parquet files
via the `UNKNOWN` logical type annotation. Notable changes are:
- Changing `ParquetFileFormat.supportDataType` to support NullType
- Changing `ParquetToSparkSchemaConverter` to infer NullType if a primitive
type has `UNKNOWN` type annotation and there's no Spark-provided expected type
- Changing `SparkToParquetSchemaConverter` to convert NullType into a
Parquet Boolean physical type with `UNKNOWN` annotation (physical type
selection here is arbitrary)
- Optimization in `On/OffHeapColumnVector` to not allocate memory if whole
vector is guaranteed to hold only nulls, which is the case for NullType columns
and columns that are missing from the file.
### Why are the changes needed?
To support reading/writing NullType columns in Parquet files.
### Does this PR introduce _any_ user-facing change?
Yes. Previously, trying to read or write Parquet files where the schema
contained NullType column would throw an error. Now, we no longer throw, and
instead write/read data as expected, using the `UNKNOWN` type annotation.
### How was this patch tested?
Manually verified that the Parquet files we write is readable by Apache
Arrow. Also verified that we can read a simple Parquet file with NullType
written by Apache Arrow. Added a new unit test. Fixed existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52922 from ZiyaZa/parquet-unknown.
Authored-by: Ziya Mukhtarov <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 10 ++++
.../datasources/parquet/ParquetColumnVector.java | 6 +--
.../parquet/ParquetVectorUpdaterFactory.java | 44 ++++++++++++++++-
.../execution/vectorized/OffHeapColumnVector.java | 6 ++-
.../execution/vectorized/OnHeapColumnVector.java | 6 ++-
.../execution/vectorized/WritableColumnVector.java | 25 ++++++----
.../datasources/parquet/ParquetFileFormat.scala | 2 +-
.../datasources/parquet/ParquetReadSupport.scala | 4 +-
.../datasources/parquet/ParquetRowConverter.scala | 7 +++
.../parquet/ParquetSchemaConverter.scala | 8 ++-
.../datasources/parquet/ParquetUtils.scala | 4 +-
.../datasources/parquet/ParquetWriteSupport.scala | 5 +-
.../spark/sql/FileBasedDataSourceSuite.scala | 6 +--
.../datasources/parquet/ParquetIOSuite.scala | 57 ++++++++++++++++++++++
.../datasources/parquet/ParquetSchemaSuite.scala | 34 +++++++++++++
.../spark/sql/hive/execution/HiveDDLSuite.scala | 23 ++-------
16 files changed, 205 insertions(+), 42 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 64cb365e5822..010003f37c52 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1552,6 +1552,13 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val PARQUET_VECTORIZED_READER_NULL_TYPE_ENABLED =
+ buildConf("spark.sql.parquet.enableNullTypeVectorizedReader")
+ .doc("Enables vectorized Parquet reader support for NullType columns.")
+ .version("4.1.0")
+ .booleanConf
+ .createWithDefault(true)
+
val PARQUET_RECORD_FILTER_ENABLED =
buildConf("spark.sql.parquet.recordLevelFilter.enabled")
.doc("If true, enables Parquet's native record-level filtering using the
pushed down " +
"filters. " +
@@ -6915,6 +6922,9 @@ class SQLConf extends Serializable with Logging with
SqlApiConf {
def parquetVectorizedReaderNestedColumnEnabled: Boolean =
getConf(PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED)
+ def parquetVectorizedReaderNullTypeEnabled: Boolean =
+ getConf(PARQUET_VECTORIZED_READER_NULL_TYPE_ENABLED)
+
def parquetVectorizedReaderBatchSize: Int =
getConf(PARQUET_VECTORIZED_READER_BATCH_SIZE)
def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
index 37c936c84d5f..002b7569a6e0 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
@@ -80,7 +80,7 @@ final class ParquetColumnVector {
}
if (defaultValue == null) {
- vector.setAllNull();
+ vector.setMissing();
return;
}
// For Parquet tables whose columns have associated DEFAULT values, this
reader must return
@@ -137,7 +137,7 @@ final class ParquetColumnVector {
// Only use levels from non-missing child, this can happen if only
some but not all
// fields of a struct are missing.
- if (!childCv.vector.isAllNull()) {
+ if (!childCv.vector.isMissing()) {
allChildrenAreMissing = false;
this.repetitionLevels = childCv.repetitionLevels;
this.definitionLevels = childCv.definitionLevels;
@@ -147,7 +147,7 @@ final class ParquetColumnVector {
// This can happen if all the fields of a struct are missing, in which
case we should mark
// the struct itself as a missing column
if (allChildrenAreMissing) {
- vector.setAllNull();
+ vector.setMissing();
}
}
}
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index eb6c84b8113b..4f90f878da86 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -26,6 +26,7 @@ import
org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation
import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.TimeLogicalTypeAnnotation;
import
org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
+import
org.apache.parquet.schema.LogicalTypeAnnotation.UnknownLogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.spark.SparkUnsupportedOperationException;
@@ -70,7 +71,12 @@ public class ParquetVectorUpdaterFactory {
}
public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType
sparkType) {
- PrimitiveType.PrimitiveTypeName typeName =
descriptor.getPrimitiveType().getPrimitiveTypeName();
+ PrimitiveType type = descriptor.getPrimitiveType();
+ PrimitiveType.PrimitiveTypeName typeName = type.getPrimitiveTypeName();
+ boolean isUnknownType = type.getLogicalTypeAnnotation() instanceof
UnknownLogicalTypeAnnotation;
+ if (isUnknownType && sparkType instanceof NullType) {
+ return new NullTypeUpdater();
+ }
switch (typeName) {
case BOOLEAN -> {
@@ -244,6 +250,42 @@ public class ParquetVectorUpdaterFactory {
!annotation.isSigned() && annotation.getBitWidth() == bitWidth;
}
+ /**
+ * Updater should not be called if all values are nulls, so all methods
throw exception here.
+ */
+ private static class NullTypeUpdater implements ParquetVectorUpdater {
+ @Override
+ public void readValues(
+ int total,
+ int offset,
+ WritableColumnVector values,
+ VectorizedValuesReader valuesReader) {
+ throw SparkUnsupportedOperationException.apply();
+ }
+
+ @Override
+ public void skipValues(int total, VectorizedValuesReader valuesReader) {
+ throw SparkUnsupportedOperationException.apply();
+ }
+
+ @Override
+ public void readValue(
+ int offset,
+ WritableColumnVector values,
+ VectorizedValuesReader valuesReader) {
+ throw SparkUnsupportedOperationException.apply();
+ }
+
+ @Override
+ public void decodeSingleDictionaryId(
+ int offset,
+ WritableColumnVector values,
+ WritableColumnVector dictionaryIds,
+ Dictionary dictionary) {
+ throw SparkUnsupportedOperationException.apply();
+ }
+ }
+
private static class BooleanUpdater implements ParquetVectorUpdater {
@Override
public void readValues(
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 2f64ffb42aa0..42454b283d09 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -111,12 +111,14 @@ public final class OffHeapColumnVector extends
WritableColumnVector {
@Override
public void putNull(int rowId) {
+ if (isAllNull()) return; // Skip writing nulls to all-null vector.
Platform.putByte(null, nulls + rowId, (byte) 1);
++numNulls;
}
@Override
public void putNulls(int rowId, int count) {
+ if (isAllNull()) return; // Skip writing nulls to all-null vector.
long offset = nulls + rowId;
for (int i = 0; i < count; ++i, ++offset) {
Platform.putByte(null, offset, (byte) 1);
@@ -135,7 +137,7 @@ public final class OffHeapColumnVector extends
WritableColumnVector {
@Override
public boolean isNullAt(int rowId) {
- return isAllNull || Platform.getByte(null, nulls + rowId) == 1;
+ return isAllNull() || Platform.getByte(null, nulls + rowId) == 1;
}
//
@@ -603,6 +605,8 @@ public final class OffHeapColumnVector extends
WritableColumnVector {
// Split out the slow path.
@Override
protected void reserveInternal(int newCapacity) {
+ if (isAllNull()) return; // Skip allocation for all-null vector.
+
int oldCapacity = (nulls == 0L) ? 0 : capacity;
if (isArray() || type instanceof MapType) {
this.lengthData =
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index cd8d0b688bed..401e499fee30 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -108,12 +108,14 @@ public final class OnHeapColumnVector extends
WritableColumnVector {
@Override
public void putNull(int rowId) {
+ if (isAllNull()) return; // Skip writing nulls to all-null vector.
nulls[rowId] = (byte)1;
++numNulls;
}
@Override
public void putNulls(int rowId, int count) {
+ if (isAllNull()) return; // Skip writing nulls to all-null vector.
for (int i = 0; i < count; ++i) {
nulls[rowId + i] = (byte)1;
}
@@ -130,7 +132,7 @@ public final class OnHeapColumnVector extends
WritableColumnVector {
@Override
public boolean isNullAt(int rowId) {
- return isAllNull || nulls[rowId] == 1;
+ return isAllNull() || nulls[rowId] == 1;
}
//
@@ -577,6 +579,8 @@ public final class OnHeapColumnVector extends
WritableColumnVector {
// Spilt this function out since it is the slow path.
@Override
protected void reserveInternal(int newCapacity) {
+ if (isAllNull()) return; // Skip allocation for all-null vector.
+
if (isArray() || type instanceof MapType) {
int[] newLengths = new int[newCapacity];
int[] newOffsets = new int[newCapacity];
diff --git
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index 3f552679bb6f..c4f06e07911d 100644
---
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -59,7 +59,7 @@ public abstract class WritableColumnVector extends
ColumnVector {
* Resets this column for writing. The currently stored values are no longer
accessible.
*/
public void reset() {
- if (isConstant || isAllNull) return;
+ if (isConstant || isAllNull()) return;
if (childColumns != null) {
for (WritableColumnVector c: childColumns) {
@@ -142,7 +142,7 @@ public abstract class WritableColumnVector extends
ColumnVector {
@Override
public boolean hasNull() {
- return isAllNull || numNulls > 0;
+ return isAllNull() || numNulls > 0;
}
@Override
@@ -876,17 +876,24 @@ public abstract class WritableColumnVector extends
ColumnVector {
}
/**
- * Marks this column only contains null values.
+ * Marks this column missing from the file.
*/
- public final void setAllNull() {
- isAllNull = true;
+ public final void setMissing() {
+ isMissing = true;
+ }
+
+ /**
+ * Whether this column is missing from the file.
+ */
+ public final boolean isMissing() {
+ return isMissing;
}
/**
* Whether this column only contains null values.
*/
public final boolean isAllNull() {
- return isAllNull;
+ return isMissing || type instanceof NullType;
}
/**
@@ -921,10 +928,10 @@ public abstract class WritableColumnVector extends
ColumnVector {
protected boolean isConstant;
/**
- * True if this column only contains nulls. This means the column values
never change, even
- * across resets. Comparing to 'isConstant' above, this doesn't require any
allocation of space.
+ * True if this column is missing from the file. This means the column
values never change and are
+ * nulls, even across resets. This doesn't require any allocation of space.
*/
- protected boolean isAllNull;
+ protected boolean isMissing;
/**
* Default size of each array length value. This grows as necessary.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 4cc3fe61d22b..08e545cb8c20 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -408,7 +408,7 @@ class ParquetFileFormat
}
override def supportDataType(dataType: DataType): Boolean = dataType match {
- case _: AtomicType => true
+ case _: AtomicType | _: NullType => true
case st: StructType => st.forall { f => supportDataType(f.dataType) }
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 25efd326f23a..7ee5b4d224b3 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -28,7 +28,7 @@ import org.apache.parquet.hadoop.api.{InitContext,
ReadSupport}
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
import org.apache.parquet.io.api.RecordMaterializer
import org.apache.parquet.schema._
-import
org.apache.parquet.schema.LogicalTypeAnnotation.{ListLogicalTypeAnnotation,
MapKeyValueTypeAnnotation, MapLogicalTypeAnnotation}
+import
org.apache.parquet.schema.LogicalTypeAnnotation.{ListLogicalTypeAnnotation,
MapKeyValueTypeAnnotation, MapLogicalTypeAnnotation,
UnknownLogicalTypeAnnotation}
import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.internal.Logging
@@ -562,6 +562,8 @@ object ParquetReadSupport extends Logging {
}
case primitiveType: PrimitiveType =>
val cost = primitiveType.getPrimitiveTypeName match {
+ case _ if primitiveType.getLogicalTypeAnnotation
+ .isInstanceOf[UnknownLogicalTypeAnnotation] => 0 // NullType is
always preferred
case PrimitiveType.PrimitiveTypeName.BOOLEAN => 1
case PrimitiveType.PrimitiveTypeName.INT32 => 4
case PrimitiveType.PrimitiveTypeName.INT64 => 8
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index f9d50bf28ea8..d708a19dd1ac 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -315,6 +315,13 @@ private[parquet] class ParquetRowConverter(
}
catalystType match {
+ case NullType
+ if
parquetType.getLogicalTypeAnnotation.isInstanceOf[UnknownLogicalTypeAnnotation]
=>
+ val parentUpdater = updater
+ // A converter that throws upon any add... call, as we don't expect
any value for NullType.
+ new PrimitiveConverter with HasParentContainerUpdater {
+ override def updater: ParentContainerUpdater = parentUpdater
+ }
case LongType if isUnsignedIntTypeMatched(32) =>
new ParquetPrimitiveConverter(updater) {
override def addInt(value: Int): Unit =
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 947c021c1bd3..76e24eb03f38 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -246,7 +246,9 @@ class ParquetToSparkSchemaConverter(
DecimalType(precision, scale)
}
- val sparkType = sparkReadType.getOrElse(typeName match {
+ val isUnknownType =
typeAnnotation.isInstanceOf[UnknownLogicalTypeAnnotation]
+ val nullTypeOpt = Option.when(isUnknownType)(NullType)
+ val sparkType = sparkReadType.orElse(nullTypeOpt).getOrElse(typeName match
{
case BOOLEAN => BooleanType
case FLOAT => FloatType
@@ -836,6 +838,10 @@ class SparkToParquetSchemaConverter(
case udt: UserDefinedType[_] =>
convertField(field.copy(dataType = udt.sqlType), inShredded)
+ case NullType => // Selected primitive type here doesn't have
significance.
+ Types.primitive(BOOLEAN, repetition).named(field.name)
+ .withLogicalTypeAnnotation(LogicalTypeAnnotation.unknownType())
+
case _ =>
throw
QueryCompilationErrors.cannotConvertDataTypeToParquetTypeError(field)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
index 65a77322549f..80a32711c50a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
@@ -45,7 +45,7 @@ import
org.apache.spark.sql.execution.datasources.{AggregatePushDownUtils, Outpu
import org.apache.spark.sql.execution.datasources.v2.V2ColumnUtils
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.internal.SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED
-import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, MapType,
StructField, StructType, UserDefinedType, VariantType}
+import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, MapType,
NullType, StructField, StructType, UserDefinedType, VariantType}
import org.apache.spark.util.ArrayImplicits._
object ParquetUtils extends Logging {
@@ -209,6 +209,8 @@ object ParquetUtils extends Logging {
def isBatchReadSupported(sqlConf: SQLConf, dt: DataType): Boolean = dt match
{
case _: AtomicType =>
true
+ case _: NullType =>
+ sqlConf.parquetVectorizedReaderNullTypeEnabled
case at: ArrayType =>
sqlConf.parquetVectorizedReaderNestedColumnEnabled &&
isBatchReadSupported(sqlConf, at.elementType)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index 2ab9fb64da43..dcaf88fa8dfd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -29,7 +29,7 @@ import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
import org.apache.parquet.io.api.{Binary, RecordConsumer}
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkException,
SparkUnsupportedOperationException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME_METADATA_KEY,
SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY,
SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.catalyst.InternalRow
@@ -192,6 +192,9 @@ class ParquetWriteSupport extends WriteSupport[InternalRow]
with Logging {
// schema. This affects how timestamp values are written.
private def makeWriter(dataType: DataType, inShredded: Boolean): ValueWriter
= {
dataType match {
+ case NullType => // No values of NullType should ever be written, as all
values are null.
+ (_: SpecializedGetters, _: Int) => throw
SparkUnsupportedOperationException()
+
case BooleanType =>
(row: SpecializedGetters, ordinal: Int) =>
recordConsumer.addBoolean(row.getBoolean(ordinal))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 35bbc6c8a1f4..95e86fe43119 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -565,10 +565,10 @@ class FileBasedDataSourceSuite extends QueryTest
}
}
- test("SPARK-24204 error handling for unsupported Null data types - csv,
parquet, orc") {
+ test("SPARK-24204 error handling for unsupported Null data types - csv,
orc") {
Seq(true, false).foreach { useV1 =>
val useV1List = if (useV1) {
- "csv,orc,parquet"
+ "csv,orc"
} else {
""
}
@@ -576,7 +576,7 @@ class FileBasedDataSourceSuite extends QueryTest
withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath
- Seq("parquet", "csv", "orc").foreach { format =>
+ Seq("csv", "orc").foreach { format =>
// write path
checkError(
exception = intercept[AnalysisException] {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index c237f5cc42fb..3072657a0954 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -296,6 +296,31 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
}
}
+ test("SPARK-54220: NullType") {
+ val data = (1 to 5)
+ .map(_ => Row(null, Row(null, null), Seq(null, null), Map(Row(null) ->
null))).asJava
+ val dataSchema = new StructType()
+ .add("_1", NullType)
+ .add("_2", new StructType()
+ .add("_1", NullType)
+ .add("_2", NullType))
+ .add("_3", ArrayType(NullType, containsNull = true))
+ .add("_4", MapType(new StructType().add("_1", NullType), NullType,
valueContainsNull = true))
+ val expected = data.asScala.toArray
+
+ withAllParquetWriters {
+ withTempPath { path =>
+ val file = path.getCanonicalPath
+ spark.createDataFrame(data, dataSchema).write.parquet(file)
+
+ withAllParquetReaders {
+ val df = spark.read.parquet(file)
+ checkAnswer(df, expected)
+ }
+ }
+ }
+ }
+
testStandardAndLegacyModes("map") {
val data = (1 to 4).map(i => Tuple1(Map(i -> s"val_$i")))
checkParquetFile(data)
@@ -928,6 +953,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest
with SharedSparkSession
}
}
+ test("SPARK-54220: vectorized reader: missing all struct fields, struct with
NullType only") {
+ val data = Seq(
+ Tuple1((null, null)),
+ Tuple1((null, null)),
+ Tuple1(null)
+ )
+ val readSchema = new StructType().add("_1",
+ new StructType()
+ .add("_3", IntegerType, nullable = true)
+ .add("_4", StringType, nullable = true),
+ nullable = true)
+ val expectedAnswer = Row(Row(null, null)) :: Row(Row(null, null)) ::
Row(null) :: Nil
+
+ withParquetFile(data) { file =>
+ for (offheapEnabled <- Seq(true, false)) {
+ withSQLConf(
+ SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key ->
"true",
+
SQLConf.LEGACY_PARQUET_RETURN_NULL_STRUCT_IF_ALL_FIELDS_MISSING.key -> "false",
+ SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key ->
offheapEnabled.toString) {
+ withAllParquetReaders {
+ val df = spark.read.schema(readSchema).parquet(file)
+ val scanNode = df.queryExecution.executedPlan.collectLeaves().head
+ if (scanNode.supportsColumnar) {
+ VerifyNoAdditionalScanOutputExec(scanNode).execute().collect()
+ }
+ checkAnswer(df, expectedAnswer)
+ }
+ }
+ }
+ }
+ }
+
test("vectorized reader: missing some struct fields") {
Seq(true, false).foreach { offheapEnabled =>
withSQLConf(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index d256752c287c..56076175d60e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -3370,6 +3370,40 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
|}
""").stripMargin,
returnNullStructIfAllFieldsMissing = returnNullStructIfAllFieldsMissing)
+
+ testSchemaClipping(
+ s"SPARK-54220: missing struct with NullType, " +
+
s"returnNullStructIfAllFieldsMissing=$returnNullStructIfAllFieldsMissing",
+ parquetSchema =
+ """message root {
+ | optional group _1 {
+ | optional int32 _1;
+ | optional boolean _2;
+ | optional group _3 {
+ | optional int64 _1 (UNKNOWN);
+ | }
+ | }
+ |}
+ """.stripMargin,
+ catalystSchema = new StructType()
+ .add("_1", new StructType()
+ .add("_101", IntegerType)
+ .add("_102", LongType)),
+ expectedSchema =
+ ("""message root {
+ | optional group _1 {
+ | optional int32 _101;
+ | optional int64 _102;""" + (if
(!returnNullStructIfAllFieldsMissing) {
+ """
+ | optional group _3 {
+ | optional int64 _1 (UNKNOWN);
+ | }
+ | }""" } else { "" }) +
+ """
+ | }
+ |}
+ """).stripMargin,
+ returnNullStructIfAllFieldsMissing = returnNullStructIfAllFieldsMissing)
}
testSchemaClipping(
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index caa4ca4581b4..baafdc1ea50a 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -2588,16 +2588,8 @@ class HiveDDLSuite
test("SPARK-36241: support creating tables with void datatype") {
// CTAS with void type
withTable("t1", "t2", "t3") {
- checkError(
- exception = intercept[AnalysisException] {
- sql("CREATE TABLE t1 USING PARQUET AS SELECT NULL AS null_col")
- },
- condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
- parameters = Map(
- "columnName" -> "`null_col`",
- "columnType" -> "\"VOID\"",
- "format" -> "Parquet")
- )
+ sql("CREATE TABLE t1 USING PARQUET AS SELECT NULL AS null_col")
+ checkAnswer(sql("SELECT * FROM t1"), Row(null))
checkError(
exception = intercept[AnalysisException] {
@@ -2615,15 +2607,8 @@ class HiveDDLSuite
// Create table with void type
withTable("t1", "t2", "t3", "t4") {
- checkError(
- exception = intercept[AnalysisException] {
- sql("CREATE TABLE t1 (v VOID) USING PARQUET")
- },
- condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
- parameters = Map(
- "columnName" -> "`v`",
- "columnType" -> "\"VOID\"",
- "format" -> "Parquet"))
+ sql("CREATE TABLE t1 (v VOID) USING PARQUET")
+ checkAnswer(sql("SELECT * FROM t1"), Seq.empty)
checkError(
exception = intercept[AnalysisException] {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]