This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dd8a149dd [core] Should work with Split in DataTableBatchScan
6dd8a149dd is described below

commit 6dd8a149dde9ef829eb1b89bd51f48c647ce7aad
Author: JingsongLi <[email protected]>
AuthorDate: Thu Feb 12 06:48:14 2026 +0800

    [core] Should work with Split in DataTableBatchScan
---
 .../paimon/table/source/DataTableBatchScan.java    | 25 ++++++----------------
 .../table/source/TopNDataSplitEvaluator.java       | 17 ++++++++-------
 2 files changed, 15 insertions(+), 27 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 19f55ae3c1..4e0416f86e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -137,24 +137,17 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
 
         long scannedRowCount = 0;
         SnapshotReader.Plan plan = ((ScannedResult) result).plan();
-        List<Split> planSplits = plan.splits();
-        // Limit pushdown only supports DataSplit. Skip for IncrementalSplit.
-        if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) {
-            return Optional.of(result);
-        }
-        @SuppressWarnings("unchecked")
-        List<DataSplit> splits = (List<DataSplit>) (List<?>) planSplits;
-
-        LOG.info("Applying limit pushdown. Original splits count: {}", 
splits.size());
+        List<Split> splits = plan.splits();
         if (splits.isEmpty()) {
             return Optional.of(result);
         }
 
+        LOG.info("Applying limit pushdown. Original splits count: {}", 
splits.size());
         List<Split> limitedSplits = new ArrayList<>();
-        for (DataSplit dataSplit : splits) {
-            OptionalLong mergedRowCount = dataSplit.mergedRowCount();
+        for (Split split : splits) {
+            OptionalLong mergedRowCount = split.mergedRowCount();
             if (mergedRowCount.isPresent()) {
-                limitedSplits.add(dataSplit);
+                limitedSplits.add(split);
                 scannedRowCount += mergedRowCount.getAsLong();
                 if (scannedRowCount >= pushDownLimit) {
                     SnapshotReader.Plan newPlan =
@@ -200,13 +193,7 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
         }
 
         SnapshotReader.Plan plan = ((ScannedResult) result).plan();
-        List<Split> planSplits = plan.splits();
-        // TopN pushdown only supports DataSplit. Skip for IncrementalSplit.
-        if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) {
-            return Optional.of(result);
-        }
-        @SuppressWarnings("unchecked")
-        List<DataSplit> splits = (List<DataSplit>) (List<?>) planSplits;
+        List<Split> splits = plan.splits();
         if (splits.isEmpty()) {
             return Optional.of(result);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
index 68deb5830b..b33f6e9212 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
@@ -51,33 +51,34 @@ public class TopNDataSplitEvaluator {
         this.schemaManager = schemaManager;
     }
 
-    public List<DataSplit> evaluate(SortValue order, int limit, 
List<DataSplit> splits) {
+    public List<Split> evaluate(SortValue order, int limit, List<Split> 
splits) {
         if (limit > splits.size()) {
             return splits;
         }
         return getTopNSplits(order, limit, splits);
     }
 
-    private List<DataSplit> getTopNSplits(SortValue order, int limit, 
List<DataSplit> splits) {
+    private List<Split> getTopNSplits(SortValue order, int limit, List<Split> 
splits) {
         int index = order.field().index();
         DataField field = schema.fields().get(index);
         SimpleStatsEvolutions evolutions =
                 new SimpleStatsEvolutions((id) -> 
scanTableSchema(id).fields(), schema.id());
 
         // extract the stats
-        List<DataSplit> results = new ArrayList<>();
+        List<Split> results = new ArrayList<>();
         List<RichSplit> richSplits = new ArrayList<>();
-        for (DataSplit split : splits) {
+        for (Split split : splits) {
             if (!minmaxAvailable(split, Collections.singleton(field.name()))) {
                 // unknown split, read it
                 results.add(split);
                 continue;
             }
 
-            Object min = split.minValue(index, field, evolutions);
-            Object max = split.maxValue(index, field, evolutions);
-            Long nullCount = split.nullCount(index, evolutions);
-            richSplits.add(new RichSplit(split, min, max, nullCount));
+            DataSplit dataSplit = (DataSplit) split;
+            Object min = dataSplit.minValue(index, field, evolutions);
+            Object max = dataSplit.maxValue(index, field, evolutions);
+            Long nullCount = dataSplit.nullCount(index, evolutions);
+            richSplits.add(new RichSplit(dataSplit, min, max, nullCount));
         }
 
         // pick the TopN splits

Reply via email to