This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6dd8a149dd [core] Should work with Split in DataTableBatchScan
6dd8a149dd is described below
commit 6dd8a149dde9ef829eb1b89bd51f48c647ce7aad
Author: JingsongLi <[email protected]>
AuthorDate: Thu Feb 12 06:48:14 2026 +0800
[core] Should work with Split in DataTableBatchScan
---
.../paimon/table/source/DataTableBatchScan.java | 25 ++++++----------------
.../table/source/TopNDataSplitEvaluator.java | 17 ++++++++-------
2 files changed, 15 insertions(+), 27 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 19f55ae3c1..4e0416f86e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -137,24 +137,17 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
long scannedRowCount = 0;
SnapshotReader.Plan plan = ((ScannedResult) result).plan();
- List<Split> planSplits = plan.splits();
- // Limit pushdown only supports DataSplit. Skip for IncrementalSplit.
- if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) {
- return Optional.of(result);
- }
- @SuppressWarnings("unchecked")
- List<DataSplit> splits = (List<DataSplit>) (List<?>) planSplits;
-
- LOG.info("Applying limit pushdown. Original splits count: {}",
splits.size());
+ List<Split> splits = plan.splits();
if (splits.isEmpty()) {
return Optional.of(result);
}
+ LOG.info("Applying limit pushdown. Original splits count: {}",
splits.size());
List<Split> limitedSplits = new ArrayList<>();
- for (DataSplit dataSplit : splits) {
- OptionalLong mergedRowCount = dataSplit.mergedRowCount();
+ for (Split split : splits) {
+ OptionalLong mergedRowCount = split.mergedRowCount();
if (mergedRowCount.isPresent()) {
- limitedSplits.add(dataSplit);
+ limitedSplits.add(split);
scannedRowCount += mergedRowCount.getAsLong();
if (scannedRowCount >= pushDownLimit) {
SnapshotReader.Plan newPlan =
@@ -200,13 +193,7 @@ public class DataTableBatchScan extends
AbstractDataTableScan {
}
SnapshotReader.Plan plan = ((ScannedResult) result).plan();
- List<Split> planSplits = plan.splits();
- // TopN pushdown only supports DataSplit. Skip for IncrementalSplit.
- if (planSplits.stream().anyMatch(s -> !(s instanceof DataSplit))) {
- return Optional.of(result);
- }
- @SuppressWarnings("unchecked")
- List<DataSplit> splits = (List<DataSplit>) (List<?>) planSplits;
+ List<Split> splits = plan.splits();
if (splits.isEmpty()) {
return Optional.of(result);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
index 68deb5830b..b33f6e9212 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/TopNDataSplitEvaluator.java
@@ -51,33 +51,34 @@ public class TopNDataSplitEvaluator {
this.schemaManager = schemaManager;
}
- public List<DataSplit> evaluate(SortValue order, int limit,
List<DataSplit> splits) {
+ public List<Split> evaluate(SortValue order, int limit, List<Split>
splits) {
if (limit > splits.size()) {
return splits;
}
return getTopNSplits(order, limit, splits);
}
- private List<DataSplit> getTopNSplits(SortValue order, int limit,
List<DataSplit> splits) {
+ private List<Split> getTopNSplits(SortValue order, int limit, List<Split>
splits) {
int index = order.field().index();
DataField field = schema.fields().get(index);
SimpleStatsEvolutions evolutions =
new SimpleStatsEvolutions((id) ->
scanTableSchema(id).fields(), schema.id());
// extract the stats
- List<DataSplit> results = new ArrayList<>();
+ List<Split> results = new ArrayList<>();
List<RichSplit> richSplits = new ArrayList<>();
- for (DataSplit split : splits) {
+ for (Split split : splits) {
if (!minmaxAvailable(split, Collections.singleton(field.name()))) {
// unknown split, read it
results.add(split);
continue;
}
- Object min = split.minValue(index, field, evolutions);
- Object max = split.maxValue(index, field, evolutions);
- Long nullCount = split.nullCount(index, evolutions);
- richSplits.add(new RichSplit(split, min, max, nullCount));
+ DataSplit dataSplit = (DataSplit) split;
+ Object min = dataSplit.minValue(index, field, evolutions);
+ Object max = dataSplit.maxValue(index, field, evolutions);
+ Long nullCount = dataSplit.nullCount(index, evolutions);
+ richSplits.add(new RichSplit(dataSplit, min, max, nullCount));
}
// pick the TopN splits