voonhous commented on code in PR #17833:
URL: https://github.com/apache/hudi/pull/17833#discussion_r2717576571


##########
hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala:
##########
@@ -196,4 +198,88 @@ abstract class BaseSpark4Adapter extends SparkAdapter with 
Logging {
       storageConf.getBoolean(SQLConf.CASE_SENSITIVE.key, 
sqlConf.caseSensitiveAnalysis),
       getRebaseSpec("CORRECTED"))
   }
+
+  override def getVariantDataType: Option[DataType] = {
+    Some(VariantType)
+  }
+
+  override def isDataTypeEqualForParquet(requiredType: DataType, fileType: 
DataType): Option[Boolean] = {
+    import org.apache.spark.sql.types.{BinaryType, StructType, VariantType}
+
+    /**
+     * Checks if a StructType is the physical representation of VariantType in 
Parquet.
+     * VariantType is stored in Parquet as a struct with two binary fields: 
"value" and "metadata".
+     */
+    def isVariantPhysicalSchema(structType: StructType): Boolean = {
+      if (structType.fields.length != 2) {
+        false
+      } else {
+        val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap
+        fieldMap.contains("value") && fieldMap.contains("metadata") &&
+          fieldMap("value") == BinaryType && fieldMap("metadata") == BinaryType
+      }
+    }
+
+    // Handle VariantType comparisons
+    (requiredType, fileType) match {
+      case (_: VariantType, s: StructType) if isVariantPhysicalSchema(s) => 
Some(true)

Review Comment:
   Files written before Spark 4.0 (or by older Hudi versions) have the struct 
representation. 
   When reading the Parquet file's schema directly (without Spark's logical 
type inference), we also get the physical struct type.
   
   IIRC, This piece of code here to address the variant columns being read out 
as base64 string issue:
   
   Our `Spark40ParquetReader`, and other versions implicitly compares schema 
that's requested, i.e. `requestedSchema` from the user and the `fileSchema` 
implicitly to do projection for mini optimizations. 
   
   With the discrepancy between how `Variant` is represented as a `DataType` 
and `MessageType`, `HoodieParquetFileFormatHelper` builds an 
`implicitTypeChangeInfo` map that looks something like this for the 
`unsafeProjection`.
   
   ```
   ({Integer@27093}7 -> {ImmutablePair@27094}{VariantType$@27095}VariantType -> 
{StructType@27096}size = 2)
   indexOfField -> requestedSchema -> fileSchema
   ```
   
   This `unsafeProjection` causes bytes in the variant to be read out as:
   ```
   
{"metadata":"AQIAAwdrZXlsaXN0","value":"AgIAAQAHExl2YWx1ZTIDAwACBAYMAQwCDAM="}
   ```
   
   ```
   instead of (which is technically equivalent), but the above being is in 
base64 string form, and will impede further evaluation/representation of 
Variant.
   ```
   
   ```
   {'value': 
b'\x02\x02\x00\x01\x00\x07\x13\x19value2\x03\x03\x00\x02\x04\x06\x0c\x01\x0c\x02\x0c\x03',
 'metadata': b'\x01\x02\x00\x03\x07keylist'}
   ```
   
   
   Which should be represented as (if Variant is supported)
   
   ```
   {"key":"value2","list":[1,2,3]}
   ```
   
   This only affects `HoodieRecordType.Spark`.
   
   That's why we need this here in `Spark4.0` and why the fileType is a 
`StructType`. 
   
   Hope this makes sense, especially with the copied out content snapshot in 
memory above of `unsafeProjection`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to