This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 186efa0207 [core] Fix  paimon_incremental_query with limit push down 
(#7269)
186efa0207 is described below

commit 186efa0207459728d4f6137f2ec924d728f22704
Author: XiaoHongbo <[email protected]>
AuthorDate: Thu Feb 12 06:35:16 2026 +0800

    [core] Fix  paimon_incremental_query with limit push down (#7269)
---
 .../paimon/table/source/DataTableBatchScan.java     | 17 +++++++++++++++--
 .../paimon/spark/sql/TableValuedFunctionsTest.scala | 21 +++++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 457229d7ad..19f55ae3c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -137,7 +137,14 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
 
         long scannedRowCount = 0;
         SnapshotReader.Plan plan = ((ScannedResult) result).plan();
-        List<DataSplit> splits = plan.dataSplits();
+        List<Split> planSplits = plan.splits();
+        // Limit pushdown only supports DataSplit. Skip for IncrementalSplit.
+        if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) {
+            return Optional.of(result);
+        }
+        @SuppressWarnings("unchecked")
+        List<DataSplit> splits = (List<DataSplit>) (List<?>) planSplits;
+
         LOG.info("Applying limit pushdown. Original splits count: {}", 
splits.size());
         if (splits.isEmpty()) {
             return Optional.of(result);
@@ -193,7 +200,13 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
         }
 
         SnapshotReader.Plan plan = ((ScannedResult) result).plan();
-        List<DataSplit> splits = plan.dataSplits();
+        List<Split> planSplits = plan.splits();
+        // TopN pushdown only supports DataSplit. Skip for IncrementalSplit.
+        if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) {
+            return Optional.of(result);
+        }
+        @SuppressWarnings("unchecked")
+        List<DataSplit> splits = (List<DataSplit>) (List<?>) planSplits;
         if (splits.isEmpty()) {
             return Optional.of(result);
         }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
index b3012e2f90..68f97743d3 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala
@@ -344,6 +344,27 @@ class TableValuedFunctionsTest extends PaimonHiveTestBase {
     }
   }
 
+  test("incremental query by tag with LIMIT") {
+    sql("use paimon")
+    withTable("t") {
+      spark.sql("""
+                  |CREATE TABLE t (a INT, b INT, c STRING)
+                  |USING paimon
+                  |TBLPROPERTIES ('primary-key'='a,b', 'bucket' = '2')
+                  |PARTITIONED BY (a)
+                  |""".stripMargin)
+      spark.sql("INSERT INTO t VALUES (1, 1, '1'), (2, 2, '2')")
+      sql("CALL sys.create_tag('t', 'tag1')")
+      spark.sql("INSERT INTO t VALUES (1, 3, '3'), (2, 4, '4')")
+      sql("CALL sys.create_tag('t', 'tag2')")
+
+      checkAnswer(
+        spark.sql(
+          "SELECT * FROM paimon_incremental_query('t', 'tag1', 'tag2') ORDER 
BY a, b LIMIT 5"),
+        Seq(Row(1, 3, "3"), Row(2, 4, "4")))
+    }
+  }
+
   private def incrementalDF(tableIdent: String, start: Int, end: Int): 
DataFrame = {
     spark.read
       .format("paimon")

Reply via email to