voonhous commented on code in PR #17833:
URL: https://github.com/apache/hudi/pull/17833#discussion_r2717576571
##########
hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala:
##########
@@ -196,4 +198,88 @@ abstract class BaseSpark4Adapter extends SparkAdapter with
Logging {
storageConf.getBoolean(SQLConf.CASE_SENSITIVE.key,
sqlConf.caseSensitiveAnalysis),
getRebaseSpec("CORRECTED"))
}
+
+ override def getVariantDataType: Option[DataType] = {
+ Some(VariantType)
+ }
+
+ override def isDataTypeEqualForParquet(requiredType: DataType, fileType:
DataType): Option[Boolean] = {
+ import org.apache.spark.sql.types.{BinaryType, StructType, VariantType}
+
+ /**
+ * Checks if a StructType is the physical representation of VariantType in
Parquet.
+ * VariantType is stored in Parquet as a struct with two binary fields:
"value" and "metadata".
+ */
+ def isVariantPhysicalSchema(structType: StructType): Boolean = {
+ if (structType.fields.length != 2) {
+ false
+ } else {
+ val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap
+ fieldMap.contains("value") && fieldMap.contains("metadata") &&
+ fieldMap("value") == BinaryType && fieldMap("metadata") == BinaryType
+ }
+ }
+
+ // Handle VariantType comparisons
+ (requiredType, fileType) match {
+ case (_: VariantType, s: StructType) if isVariantPhysicalSchema(s) =>
Some(true)
Review Comment:
Files written before Spark 4.0 (or by older Hudi versions) have the struct
representation.
When reading the Parquet file's schema directly (without Spark's logical
type inference), we also get the physical struct type.
IIRC, This piece of code here to address the variant columns being read out
as base64 string issue:
Our `Spark40ParquetReader`, and other versions implicitly compares schema
that's requested, i.e. `requestedSchema` from the user and the `fileSchema`
implicitly to do projection for mini optimizations.
With the discrepancy between how `Variant` is represented as a `DataType`
and `MessageType`, `HoodieParquetFileFormatHelper` builds an
`implicitTypeChangeInfo` map that looks something like this for the
`unsafeProjection`.
```
({Integer@27093}7 -> {ImmutablePair@27094}{VariantType$@27095}VariantType ->
{StructType@27096}size = 2)
indexOfField -> requestedSchema -> fileSchema
```
This `unsafeProjection` causes bytes in the variant to be read out as:
```
{"metadata":"AQIAAwdrZXlsaXN0","value":"AgIAAQAHExl2YWx1ZTIDAwACBAYMAQwCDAM="}
```
```
instead of (which is technically equivalent), but the above being is in
base64 string form, and will impede further evaluation/representation of
Variant.
```
```
{'value':
b'\x02\x02\x00\x01\x00\x07\x13\x19value2\x03\x03\x00\x02\x04\x06\x0c\x01\x0c\x02\x0c\x03',
'metadata': b'\x01\x02\x00\x03\x07keylist'}
```
Which should be represented as (if Variant is supported)
```
{"key":"value2","list":[1,2,3]}
```
This only affects `HoodieRecordType.Spark`.
That's why we need this here in `Spark4.0` and why the fileType is a
`StructType`.
Hope this makes sense, especially with the copied out content snapshot in
memory above of `unsafeProjection`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]