0lai0 commented on code in PR #3706:
URL: https://github.com/apache/datafusion-comet/pull/3706#discussion_r3006270859


##########
spark/src/main/scala/org/apache/comet/serde/arrays.scala:
##########
@@ -200,6 +201,88 @@ object CometArrayDistinct extends 
CometExpressionSerde[ArrayDistinct] {
   }
 }
 
+object CometSortArray extends CometExpressionSerde[SortArray] {
+  private def containsFloatingPoint(dt: DataType): Boolean = {
+    dt match {
+      case FloatType | DoubleType => true
+      case ArrayType(elementType, _) => containsFloatingPoint(elementType)
+      case StructType(fields) => fields.exists(f => 
containsFloatingPoint(f.dataType))
+      case MapType(keyType, valueType, _) =>
+        containsFloatingPoint(keyType) || containsFloatingPoint(valueType)
+      case _ => false
+    }
+  }
+
+  private def canRank(dt: DataType, nestedInArray: Boolean = false): Boolean = 
{
+    dt match {
+      case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: 
FloatType |
+          _: DoubleType | _: DecimalType =>
+        true
+      case _: DateType | _: TimestampType | _: TimestampNTZType =>
+        true
+      // DataFusion's array_sort compares nested arrays through Arrow's rank 
kernel.
+      // That kernel does not support Struct or Null child values,
+      // so array<array<struct<...>>> and array<array<null>> would fail at 
runtime.
+      case _: NullType if !nestedInArray =>
+        true
+      case _: BooleanType | _: BinaryType | _: StringType =>
+        true
+      case ArrayType(elementType, _) =>
+        canRank(elementType, nestedInArray = true)
+      case StructType(fields) if !nestedInArray =>
+        fields.forall(f => canRank(f.dataType))
+      case _ =>
+        false
+    }
+  }
+
+  override def getSupportLevel(expr: SortArray): SupportLevel = {
+    val elementType = expr.base.dataType.asInstanceOf[ArrayType].elementType
+
+    if (!canRank(elementType)) {
+      Unsupported(Some(s"Sort on array element type $elementType is not 
supported"))
+    } else if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() &&
+      containsFloatingPoint(elementType)) {
+      Incompatible(
+        Some(
+          "Sorting on floating-point is not 100% compatible with Spark, and 
Comet is running " +
+            s"with ${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true. " +
+            s"${CometConf.COMPAT_GUIDE}"))
+    } else {
+      Compatible()
+    }
+  }
+
+  override def convert(
+      expr: SortArray,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val arrayExprProto = exprToProtoInternal(expr.base, inputs, binding)
+    val sortDirectionExprProto = expr.ascendingOrder match {

Review Comment:
   nit: In the convert method, we are pattern matching on expr.ascendingOrder 
twice to extract both the sort direction and the null ordering. We can make 
this code more concise by combining them into a single pattern match using 
Scala tuples.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to