the-other-tim-brown commented on code in PR #13652:
URL: https://github.com/apache/hudi/pull/13652#discussion_r2246146649


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala:
##########
@@ -104,6 +104,57 @@ object HoodieParquetFileFormatHelper {
                                requiredSchema: StructType,
                                partitionSchema: StructType,
                                schemaUtils: HoodieSchemaUtils): 
UnsafeProjection = {
+    val floatToDoubleCache = scala.collection.mutable.HashMap.empty[(DataType, 
DataType), Boolean]
+
+    def hasFloatToDouble(src: DataType, dst: DataType): Boolean = {
+      floatToDoubleCache.getOrElseUpdate((src, dst), {
+        (src, dst) match {
+          case (FloatType, DoubleType) => true
+          case (StructType(srcFields), StructType(dstFields)) =>
+            srcFields.zip(dstFields).exists { case (sf, df) => 
hasFloatToDouble(sf.dataType, df.dataType) }
+          case (ArrayType(sElem, _), ArrayType(dElem, _)) =>
+            hasFloatToDouble(sElem, dElem)
+          case (MapType(sKey, sVal, _), MapType(dKey, dVal, _)) =>
+            hasFloatToDouble(sKey, dKey) || hasFloatToDouble(sVal, dVal)
+          case _ => false
+        }
+      })
+    }
+
+    def repairFloatDoubleConversion(expr: Expression, srcType: DataType, 
dstType: DataType): Expression = {
+      lazy val needTimeZone = Cast.needsTimeZone(srcType, dstType)
+      if (srcType == FloatType && dstType == DoubleType) {

Review Comment:
   nitpick: Is there a way to incorporate this in the `(srcType, dstType)` 
matching below?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala:
##########
@@ -104,6 +104,57 @@ object HoodieParquetFileFormatHelper {
                                requiredSchema: StructType,
                                partitionSchema: StructType,
                                schemaUtils: HoodieSchemaUtils): 
UnsafeProjection = {
+    val floatToDoubleCache = scala.collection.mutable.HashMap.empty[(DataType, 
DataType), Boolean]
+
+    def hasFloatToDouble(src: DataType, dst: DataType): Boolean = {
+      floatToDoubleCache.getOrElseUpdate((src, dst), {
+        (src, dst) match {
+          case (FloatType, DoubleType) => true
+          case (StructType(srcFields), StructType(dstFields)) =>
+            srcFields.zip(dstFields).exists { case (sf, df) => 
hasFloatToDouble(sf.dataType, df.dataType) }
+          case (ArrayType(sElem, _), ArrayType(dElem, _)) =>
+            hasFloatToDouble(sElem, dElem)
+          case (MapType(sKey, sVal, _), MapType(dKey, dVal, _)) =>
+            hasFloatToDouble(sKey, dKey) || hasFloatToDouble(sVal, dVal)
+          case _ => false
+        }
+      })
+    }
+
+    def repairFloatDoubleConversion(expr: Expression, srcType: DataType, 
dstType: DataType): Expression = {

Review Comment:
   This is doing more than float/double so update the naming



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala:
##########
@@ -104,6 +104,57 @@ object HoodieParquetFileFormatHelper {
                                requiredSchema: StructType,
                                partitionSchema: StructType,
                                schemaUtils: HoodieSchemaUtils): 
UnsafeProjection = {
+    val floatToDoubleCache = scala.collection.mutable.HashMap.empty[(DataType, 
DataType), Boolean]
+
+    def hasFloatToDouble(src: DataType, dst: DataType): Boolean = {
+      floatToDoubleCache.getOrElseUpdate((src, dst), {
+        (src, dst) match {
+          case (FloatType, DoubleType) => true
+          case (StructType(srcFields), StructType(dstFields)) =>
+            srcFields.zip(dstFields).exists { case (sf, df) => 
hasFloatToDouble(sf.dataType, df.dataType) }
+          case (ArrayType(sElem, _), ArrayType(dElem, _)) =>
+            hasFloatToDouble(sElem, dElem)
+          case (MapType(sKey, sVal, _), MapType(dKey, dVal, _)) =>
+            hasFloatToDouble(sKey, dKey) || hasFloatToDouble(sVal, dVal)
+          case _ => false
+        }
+      })
+    }
+
+    def repairFloatDoubleConversion(expr: Expression, srcType: DataType, 
dstType: DataType): Expression = {
+      lazy val needTimeZone = Cast.needsTimeZone(srcType, dstType)
+      if (srcType == FloatType && dstType == DoubleType) {
+        val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else 
None)
+        Cast(toStr, dstType, if (needTimeZone) timeZoneId else None)
+      } else (srcType, dstType) match {
+        case (s: StructType, d: StructType) if hasFloatToDouble(s, d) =>
+          val structFields = s.fields.zip(d.fields).zipWithIndex.map {
+            case ((srcField, dstField), i) =>
+              val child = GetStructField(expr, i, Some(dstField.name))
+              repairFloatDoubleConversion(child, srcField.dataType, 
dstField.dataType)
+          }
+          CreateNamedStruct(d.fields.zip(structFields).flatMap {
+            case (f, c) => Seq(Literal(f.name), c)
+          })
+        case (ArrayType(sElementType, containsNull), ArrayType(dElementType, 
_)) if hasFloatToDouble(sElementType, dElementType) =>
+          val lambdaVar = NamedLambdaVariable("x", sElementType, containsNull)

Review Comment:
   nitpick: `x` could be `element` to be more consistent with how we work with 
arrays elsewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to