Zouxxyy commented on code in PR #6028:
URL: https://github.com/apache/paimon/pull/6028#discussion_r2267126681


##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala:
##########
@@ -128,7 +128,22 @@ abstract class PaimonBaseScan(
     } else {
       ""
     }
-    s"PaimonScan: [${table.name}]" + pushedFiltersStr +
+    val pushedTopNFilterStr = pushDownTopN
+      .map(
+        topN => {
+          val ordersStr = topN

Review Comment:
   we can add a toString in topN



##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala:
##########
@@ -90,6 +97,57 @@ class PaimonScanBuilder(table: InnerTable)
     false
   }
 
+  override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
+    if (hasPostScanPredicates) {
+      return false
+    }
+
+    if (!table.isInstanceOf[FileStoreTable]) {
+      return false
+    }
+
+    val coreOptions = CoreOptions.fromMap(table.options())
+    if (coreOptions.deletionVectorsEnabled()) {
+      return false
+    }
+
+    if (orders.length != 1) {
+      return false
+    }
+
+    val order = orders(0)
+    if (!order.expression().isInstanceOf[NamedReference]) {
+      return false
+    }
+
+    val fieldName = 
order.expression().asInstanceOf[NamedReference].fieldNames().mkString(".")
+    val rowType = table.rowType()
+    if (rowType.notContainsField(fieldName)) {
+      return false
+    }
+
+    val field = rowType.getField(fieldName)
+    val ref = new FieldRef(field.id(), field.name(), field.`type`())
+
+    val nullOrdering = order.nullOrdering() match {
+      case expressions.NullOrdering.NULLS_LAST => NullOrdering.NULLS_LAST
+      case _ => NullOrdering.NULLS_FIRST

Review Comment:
   match all types otherwise exception



##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala:
##########
@@ -90,6 +97,57 @@ class PaimonScanBuilder(table: InnerTable)
     false
   }
 
+  override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
+    if (hasPostScanPredicates) {
+      return false
+    }
+
+    if (!table.isInstanceOf[FileStoreTable]) {
+      return false
+    }
+
+    val coreOptions = CoreOptions.fromMap(table.options())
+    if (coreOptions.deletionVectorsEnabled()) {
+      return false
+    }
+
+    if (orders.length != 1) {
+      return false
+    }
+
+    val order = orders(0)
+    if (!order.expression().isInstanceOf[NamedReference]) {
+      return false
+    }
+
+    val fieldName = 
order.expression().asInstanceOf[NamedReference].fieldNames().mkString(".")

Review Comment:
    ```scala
       val fieldName = orders.head.expression() match {
         case nr: NamedReference => nr.fieldNames.mkString(".")
         case _ => return false
       }
   ```



##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala:
##########
@@ -90,6 +97,57 @@ class PaimonScanBuilder(table: InnerTable)
     false
   }
 
+  override def pushTopN(orders: Array[SortOrder], limit: Int): Boolean = {
+    if (hasPostScanPredicates) {
+      return false
+    }
+
+    if (!table.isInstanceOf[FileStoreTable]) {
+      return false
+    }
+
+    val coreOptions = CoreOptions.fromMap(table.options())
+    if (coreOptions.deletionVectorsEnabled()) {
+      return false
+    }
+
+    if (orders.length != 1) {
+      return false
+    }
+
+    val order = orders(0)
+    if (!order.expression().isInstanceOf[NamedReference]) {
+      return false
+    }
+
+    val fieldName = 
order.expression().asInstanceOf[NamedReference].fieldNames().mkString(".")
+    val rowType = table.rowType()
+    if (rowType.notContainsField(fieldName)) {
+      return false
+    }
+
+    val field = rowType.getField(fieldName)
+    val ref = new FieldRef(field.id(), field.name(), field.`type`())
+
+    val nullOrdering = order.nullOrdering() match {
+      case expressions.NullOrdering.NULLS_LAST => NullOrdering.NULLS_LAST
+      case _ => NullOrdering.NULLS_FIRST
+    }
+
+    val direction = order.direction() match {
+      case expressions.SortDirection.DESCENDING => SortDirection.DESCENDING
+      case _ => SortDirection.ASCENDING
+    }
+
+    val sort = new SortValue(ref, direction, nullOrdering)
+    pushDownTopN = Some(new TopN(Collections.singletonList(sort), limit))
+
+    // just make the best effort to push down TopN
+    false
+  }
+
+  override def isPartiallyPushed: Boolean = super.isPartiallyPushed

Review Comment:
   is this needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to