This is an automated email from the ASF dual-hosted git repository.
aitozi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3c9fe281e3 [core] Fix chain table partition listing under data
filters. (#7268)
3c9fe281e3 is described below
commit 3c9fe281e3fbd64938938aa300fd34f4a1afc21f
Author: Junrui Lee <[email protected]>
AuthorDate: Tue Mar 10 10:17:24 2026 +0800
[core] Fix chain table partition listing under data filters. (#7268)
---
.../apache/paimon/table/ChainGroupReadTable.java | 185 ++++++++++++++++++---
.../org/apache/paimon/table/source/ChainSplit.java | 2 +-
.../apache/paimon/table/source/ChainSplitTest.java | 2 +-
.../apache/paimon/spark/SparkChainTableITCase.java | 57 ++++++-
4 files changed, 221 insertions(+), 25 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
index 9b7594cef1..36f9778ee3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
@@ -26,6 +26,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReader;
@@ -39,7 +40,9 @@ import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import java.io.IOException;
@@ -50,6 +53,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -130,6 +134,9 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
private final CoreOptions options;
private final RecordComparator partitionComparator;
private final ChainGroupReadTable chainGroupReadTable;
+ private PartitionPredicate partitionPredicate;
+ private Predicate dataPredicate;
+ private Filter<Integer> bucketFilter;
public ChainTableBatchScan(
DataTableScan mainScan,
@@ -153,10 +160,98 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
tableSchema.logicalPartitionType().getFieldTypes());
}
+ @Override
+ public ChainTableBatchScan withFilter(Predicate predicate) {
+ super.withFilter(predicate);
+ if (predicate != null) {
+ Pair<Optional<PartitionPredicate>, List<Predicate>> pair =
+
PartitionPredicate.splitPartitionPredicatesAndDataPredicates(
+ predicate,
+ tableSchema.logicalRowType(),
+ tableSchema.partitionKeys());
+ setPartitionPredicate(pair.getLeft().orElse(null));
+ dataPredicate =
+ pair.getRight().isEmpty() ? null :
PredicateBuilder.and(pair.getRight());
+ }
+ return this;
+ }
+
+ @Override
+ public ChainTableBatchScan withPartitionFilter(Map<String, String>
partitionSpec) {
+ super.withPartitionFilter(partitionSpec);
+ if (partitionSpec != null) {
+ setPartitionPredicate(
+ PartitionPredicate.fromMap(
+ tableSchema.logicalPartitionType(),
+ partitionSpec,
+ options.partitionDefaultName()));
+ }
+ return this;
+ }
+
+ @Override
+ public ChainTableBatchScan withPartitionFilter(List<BinaryRow>
partitions) {
+ super.withPartitionFilter(partitions);
+ if (partitions != null) {
+ setPartitionPredicate(
+ PartitionPredicate.fromMultiple(
+ tableSchema.logicalPartitionType(),
partitions));
+ }
+ return this;
+ }
+
+ @Override
+ public ChainTableBatchScan withPartitionsFilter(List<Map<String,
String>> partitions) {
+ super.withPartitionsFilter(partitions);
+ if (partitions != null) {
+ setPartitionPredicate(
+ PartitionPredicate.fromMaps(
+ tableSchema.logicalPartitionType(),
+ partitions,
+ options.partitionDefaultName()));
+ }
+ return this;
+ }
+
+ @Override
+ public ChainTableBatchScan withPartitionFilter(PartitionPredicate
partitionPredicate) {
+ super.withPartitionFilter(partitionPredicate);
+ if (partitionPredicate != null) {
+ setPartitionPredicate(partitionPredicate);
+ }
+ return this;
+ }
+
+ @Override
+ public ChainTableBatchScan withPartitionFilter(Predicate
partitionPredicate) {
+ super.withPartitionFilter(partitionPredicate);
+ if (partitionPredicate != null) {
+ setPartitionPredicate(
+ PartitionPredicate.fromPredicate(
+ tableSchema.logicalPartitionType(),
partitionPredicate));
+ }
+ return this;
+ }
+
+ @Override
+ public ChainTableBatchScan withBucketFilter(Filter<Integer>
bucketFilter) {
+ this.bucketFilter = bucketFilter;
+ super.withBucketFilter(bucketFilter);
+ return this;
+ }
+
+ /**
+ * Builds a plan for chain tables.
+ *
+ * <p>Partitions that exist in the snapshot branch (based on partition
predicates only) are
+ * treated as complete and are read directly from snapshot, subject to
row-level predicates.
+ * Partitions that exist only in the delta branch are planned as chain
splits by pairing
+ * each delta partition with the latest snapshot partition at or
before it (if any), so the
+ * reader sees a full partition view.
+ */
@Override
public Plan plan() {
List<Split> splits = new ArrayList<>();
- Set<BinaryRow> completePartitions = new HashSet<>();
PredicateBuilder builder = new
PredicateBuilder(tableSchema.logicalPartitionType());
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
@@ -170,21 +265,22 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
new ChainSplit(
dataSplit.partition(),
dataSplit.dataFiles(),
- fileBucketPathMapping,
- fileBranchMapping));
- completePartitions.add(dataSplit.partition());
+ fileBranchMapping,
+ fileBucketPathMapping));
}
- List<BinaryRow> remainingPartitions =
- fallbackScan.listPartitions().stream()
- .filter(p -> !completePartitions.contains(p))
+
+ Set<BinaryRow> snapshotPartitions =
+ new HashSet<>(
+ newPartitionListingScan(true,
partitionPredicate).listPartitions());
+
+ DataTableScan deltaPartitionScan = newPartitionListingScan(false,
partitionPredicate);
+ List<BinaryRow> deltaPartitions =
+ deltaPartitionScan.listPartitions().stream()
+ .filter(p -> !snapshotPartitions.contains(p))
+ .sorted(partitionComparator)
.collect(Collectors.toList());
- if (!remainingPartitions.isEmpty()) {
- fallbackScan.withPartitionFilter(remainingPartitions);
- List<BinaryRow> deltaPartitions =
fallbackScan.listPartitions();
- deltaPartitions =
- deltaPartitions.stream()
- .sorted(partitionComparator)
- .collect(Collectors.toList());
+
+ if (!deltaPartitions.isEmpty()) {
BinaryRow maxPartition =
deltaPartitions.get(deltaPartitions.size() - 1);
Predicate snapshotPredicate =
ChainTableUtils.createTriangularPredicate(
@@ -192,8 +288,13 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
partitionConverter,
builder::equal,
builder::lessThan);
- mainScan.withPartitionFilter(snapshotPredicate);
- List<BinaryRow> candidateSnapshotPartitions =
mainScan.listPartitions();
+ PartitionPredicate snapshotPartitionPredicate =
+ PartitionPredicate.fromPredicate(
+ tableSchema.logicalPartitionType(),
snapshotPredicate);
+ DataTableScan snapshotPartitionsScan =
+ newPartitionListingScan(true,
snapshotPartitionPredicate);
+ List<BinaryRow> candidateSnapshotPartitions =
+ snapshotPartitionsScan.listPartitions();
candidateSnapshotPartitions =
candidateSnapshotPartitions.stream()
.sorted(partitionComparator)
@@ -202,8 +303,8 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
ChainTableUtils.findFirstLatestPartitions(
deltaPartitions, candidateSnapshotPartitions,
partitionComparator);
for (Map.Entry<BinaryRow, BinaryRow> partitionParis :
partitionMapping.entrySet()) {
- DataTableScan snapshotScan =
chainGroupReadTable.newSnapshotScan();
- DataTableScan deltaScan =
chainGroupReadTable.newDeltaScan();
+ DataTableScan snapshotScan = newFilteredScan(true);
+ DataTableScan deltaScan = newFilteredScan(false);
if (partitionParis.getValue() == null) {
List<Predicate> predicates = new ArrayList<>();
predicates.add(
@@ -281,8 +382,8 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
.flatMap(
datsSplit ->
datsSplit.dataFiles().stream())
.collect(Collectors.toList()),
- fileBucketPathMapping,
- fileBranchMapping);
+ fileBranchMapping,
+ fileBucketPathMapping);
splits.add(split);
}
}
@@ -292,7 +393,49 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
@Override
public List<PartitionEntry> listPartitionEntries() {
- return super.listPartitionEntries();
+ DataTableScan snapshotScan = newPartitionListingScan(true,
partitionPredicate);
+ DataTableScan deltaScan = newPartitionListingScan(false,
partitionPredicate);
+ List<PartitionEntry> partitionEntries =
+ new ArrayList<>(snapshotScan.listPartitionEntries());
+ Set<BinaryRow> partitions =
+ partitionEntries.stream()
+ .map(PartitionEntry::partition)
+ .collect(Collectors.toSet());
+ List<PartitionEntry> fallBackPartitionEntries =
deltaScan.listPartitionEntries();
+ fallBackPartitionEntries.stream()
+ .filter(e -> !partitions.contains(e.partition()))
+ .forEach(partitionEntries::add);
+ return partitionEntries;
+ }
+
+ private void setPartitionPredicate(PartitionPredicate predicate) {
+ this.partitionPredicate = predicate;
+ }
+
+ private DataTableScan newPartitionListingScan(
+ boolean snapshot, PartitionPredicate scanPartitionPredicate) {
+ DataTableScan scan =
+ snapshot
+ ? chainGroupReadTable.newSnapshotScan()
+ : chainGroupReadTable.newDeltaScan();
+ if (scanPartitionPredicate != null) {
+ scan.withPartitionFilter(scanPartitionPredicate);
+ }
+ return scan;
+ }
+
+ private DataTableScan newFilteredScan(boolean snapshot) {
+ DataTableScan scan =
+ snapshot
+ ? chainGroupReadTable.newSnapshotScan()
+ : chainGroupReadTable.newDeltaScan();
+ if (dataPredicate != null) {
+ scan.withFilter(dataPredicate);
+ }
+ if (bucketFilter != null) {
+ scan.withBucketFilter(bucketFilter);
+ }
+ return scan;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
index 5339cd06a6..dfa364f96a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ChainSplit.java
@@ -183,6 +183,6 @@ public class ChainSplit implements Split {
}
return new ChainSplit(
- logicalPartition, dataFiles, fileBucketPathMapping,
fileBranchMapping);
+ logicalPartition, dataFiles, fileBranchMapping,
fileBucketPathMapping);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
index c042f6a371..e281d7557f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/ChainSplitTest.java
@@ -56,7 +56,7 @@ public class ChainSplitTest {
}
ChainSplit split =
new ChainSplit(
- logicalPartition, dataFiles, fileBucketPathMapping,
fileBranchMapping);
+ logicalPartition, dataFiles, fileBranchMapping,
fileBucketPathMapping);
byte[] bytes = InstantiationUtil.serializeObject(split);
ChainSplit newSplit =
InstantiationUtil.deserializeObject(bytes,
ChainSplit.class.getClassLoader());
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index 2465a925d4..83a77f1232 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -143,7 +143,7 @@ public class SparkChainTableITCase {
spark.sql(
"insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811') values (2, 2, '1-1' ),(4, 1, '1' );");
spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250812') values (3, 2, '1-1' ),(4, 2, '1-1' );");
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250812') values (3, 2, '1-1' ),(4, 2, '1-1' ),(7, 1, 'd7' );");
spark.sql(
"insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250813') values (5, 1, '1' ),(6, 1, '1' );");
spark.sql(
@@ -202,6 +202,48 @@ public class SparkChainTableITCase {
"[3,1,1,20250811]",
"[4,1,1,20250811]");
+ /** Chain read with filter */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250811' and t1 = 1")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,2,1-1,20250811]");
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250811' and t1 = 4")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[4,1,1,20250811]");
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250811' and t1 = 7")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .isEmpty();
+
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt in ('20250811', '20250812') and t1 = 1")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,2,1-1,20250811]",
"[1,2,1-1,20250812]");
+
+ /** Snapshot read with filter */
+ assertThat(
+ spark.sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250812' and t1 = 7")
+ .collectAsList())
+ .isEmpty();
+
/** Multi partition Read */
assertThat(
spark
@@ -246,7 +288,8 @@ public class SparkChainTableITCase {
"[2,2,1-1,20250811]",
"[4,1,1,20250811]",
"[3,2,1-1,20250812]",
- "[4,2,1-1,20250812]");
+ "[4,2,1-1,20250812]",
+ "[7,1,d7,20250812]");
/** Hybrid read */
assertThat(
@@ -402,6 +445,16 @@ public class SparkChainTableITCase {
"[3,1,1,20250810,23]",
"[4,1,1,20250810,23]");
+ /** Chain read with non-partition filter */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250810' and hour = '23' and t1 = 1")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,2,1-1,20250810,23]");
+
/** Multi partition Read */
assertThat(
spark