This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 40505091c7 [core] Minor refactor partition predicates in
FallbackReadScan (#7950)
40505091c7 is described below
commit 40505091c7e86cd8c3af1c087b9604f340046eb3
Author: Juntao Zhang <[email protected]>
AuthorDate: Wed May 27 10:43:01 2026 +0800
[core] Minor refactor partition predicates in FallbackReadScan (#7950)
This PR is a prerequisite for [`[spark] Support compact_chain_table
procedure`](https://github.com/apache/paimon/pull/7313).
`FallbackReadScan` currently uses a single `partitionPredicate` for both
main and fallback scans. However, in chain table compaction scenarios,
we need to apply different
partition filters to the main (snapshot) branch and the fallback (delta)
branch. For example, when overwriting an existing partition, we need to:
- Exclude the target partition from the main scan
- Include only the target partition in the fallback scan
This PR refactors `FallbackReadScan` to support separate partition
predicates for main and fallback scans.
---
.../apache/paimon/table/ChainGroupReadTable.java | 12 ++--
.../paimon/table/FallbackReadFileStoreTable.java | 48 +++++++++++----
.../table/FallbackReadFileStoreTableTest.java | 69 ++++++++++++++++++++++
3 files changed, 110 insertions(+), 19 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
index 6267f3eb04..915f46cd8e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
@@ -236,7 +236,6 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
@Override
public Plan plan() {
List<Split> splits = new ArrayList<>();
- PartitionPredicate partitionPredicate = getPartitionPredicate();
PredicateBuilder builder = new
PredicateBuilder(tableSchema.logicalPartitionType());
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
@@ -256,11 +255,11 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
Set<BinaryRow> snapshotPartitions =
new HashSet<>(
- newChainPartitionListingScan(true,
partitionPredicate)
+ newChainPartitionListingScan(true,
getMainPartitionPredicate())
.listPartitions());
DataTableScan deltaPartitionScan =
- newChainPartitionListingScan(false, partitionPredicate);
+ newChainPartitionListingScan(false,
getFallbackPartitionPredicate());
List<BinaryRow> deltaPartitions =
deltaPartitionScan.listPartitions().stream()
.filter(p -> !snapshotPartitions.contains(p))
@@ -433,9 +432,10 @@ public class ChainGroupReadTable extends
FallbackReadFileStoreTable {
@Override
public List<PartitionEntry> listPartitionEntries() {
- PartitionPredicate partitionPredicate = getPartitionPredicate();
- DataTableScan snapshotScan = newChainPartitionListingScan(true,
partitionPredicate);
- DataTableScan deltaScan = newChainPartitionListingScan(false,
partitionPredicate);
+ DataTableScan snapshotScan =
+ newChainPartitionListingScan(true,
getMainPartitionPredicate());
+ DataTableScan deltaScan =
+ newChainPartitionListingScan(false,
getFallbackPartitionPredicate());
List<PartitionEntry> partitionEntries =
new ArrayList<>(snapshotScan.listPartitionEntries());
Set<BinaryRow> partitions =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 75353983bb..5a1a5b5a89 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -369,7 +369,8 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
protected final Function<FileStoreTable, DataTableScan> scanCreator;
protected final DataTableScan mainScan;
protected final DataTableScan fallbackScan;
- private PartitionPredicate partitionPredicate;
+ private PartitionPredicate mainPartitionPredicate;
+ private PartitionPredicate fallbackPartitionPredicate;
public FallbackReadScan(
FileStoreTable wrappedTable,
@@ -475,6 +476,15 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
return this;
}
+ public InnerTableScan withPartitionFilter(
+ PartitionPredicate mainPartitionPredicate,
+ PartitionPredicate fallbackPartitionPredicate) {
+ mainScan.withPartitionFilter(mainPartitionPredicate);
+ fallbackScan.withPartitionFilter(fallbackPartitionPredicate);
+ setPartitionPredicate(mainPartitionPredicate,
fallbackPartitionPredicate);
+ return this;
+ }
+
@Override
public FallbackReadScan withBucketFilter(Filter<Integer> bucketFilter)
{
mainScan.withBucketFilter(bucketFilter);
@@ -521,15 +531,14 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
public TableScan.Plan plan() {
List<Split> splits = new ArrayList<>();
Set<BinaryRow> completePartitions =
- new HashSet<>(
- newPartitionListingScan(true,
partitionPredicate).listPartitions());
+ new
HashSet<>(newPartitionListingScan(true).listPartitions());
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
splits.add(toFallbackSplit(dataSplit, false));
}
List<BinaryRow> remainingPartitions =
- newPartitionListingScan(false,
partitionPredicate).listPartitions().stream()
+ newPartitionListingScan(false).listPartitions().stream()
.filter(p -> !completePartitions.contains(p))
.collect(Collectors.toList());
if (!remainingPartitions.isEmpty()) {
@@ -543,8 +552,8 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
@Override
public List<PartitionEntry> listPartitionEntries() {
- DataTableScan mainListingScan = newPartitionListingScan(true,
partitionPredicate);
- DataTableScan fallbackListingScan = newPartitionListingScan(false,
partitionPredicate);
+ DataTableScan mainListingScan = newPartitionListingScan(true);
+ DataTableScan fallbackListingScan = newPartitionListingScan(false);
List<PartitionEntry> partitionEntries =
new ArrayList<>(mainListingScan.listPartitionEntries());
Set<BinaryRow> partitions =
@@ -560,18 +569,31 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
}
protected void setPartitionPredicate(PartitionPredicate predicate) {
- this.partitionPredicate = predicate;
+ this.mainPartitionPredicate = predicate;
+ this.fallbackPartitionPredicate = predicate;
+ }
+
+ protected void setPartitionPredicate(
+ PartitionPredicate mainPartitionPredicate,
+ PartitionPredicate fallbackPartitionPredicate) {
+ this.mainPartitionPredicate = mainPartitionPredicate;
+ this.fallbackPartitionPredicate = fallbackPartitionPredicate;
+ }
+
+ protected PartitionPredicate getMainPartitionPredicate() {
+ return mainPartitionPredicate;
}
- protected PartitionPredicate getPartitionPredicate() {
- return partitionPredicate;
+ protected PartitionPredicate getFallbackPartitionPredicate() {
+ return fallbackPartitionPredicate;
}
- private DataTableScan newPartitionListingScan(
- boolean isMain, PartitionPredicate scanPartitionPredicate) {
+ private DataTableScan newPartitionListingScan(boolean isMain) {
DataTableScan scan = scanCreator.apply(isMain ? wrappedTable :
fallbackTable);
- if (scanPartitionPredicate != null) {
- scan.withPartitionFilter(scanPartitionPredicate);
+ if (isMain && getMainPartitionPredicate() != null) {
+ scan.withPartitionFilter(getMainPartitionPredicate());
+ } else if (!isMain && getFallbackPartitionPredicate() != null) {
+ scan.withPartitionFilter(getFallbackPartitionPredicate());
}
return scan;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
index 483ef861eb..e0841df4d0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FallbackReadFileStoreTableTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
@@ -27,8 +28,10 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
@@ -53,9 +56,11 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.paimon.table.SchemaEvolutionTableTestBase.rowData;
@@ -333,6 +338,70 @@ public class FallbackReadFileStoreTableTest {
};
}
+ /**
+ * Test that FallbackReadScan uses separate partition predicates for main
and fallback scans.
+ * When withPartitionFilter(mainPredicate, fallbackPredicate) is called,
plan() should only list
+ * partitions matching the corresponding predicate from each branch.
+ */
+ @Test
+ public void testMainAndFallbackPartitionPredicates() throws Exception {
+ FileStoreTable mainTable = createTable();
+ writeDataIntoTable(mainTable, 0, rowData(1, 10), rowData(2, 20));
+
+ mainTable.createBranch("bc");
+ FileStoreTable branchTable = createTableFromBranch(mainTable, "bc");
+ writeDataIntoTable(
+ branchTable, 0, rowData(1, 100), rowData(2, 200), rowData(3,
300), rowData(4, 400));
+
+ FallbackReadFileStoreTable table =
+ new FallbackReadFileStoreTable(mainTable, branchTable, true);
+
+ RowType partitionType = RowType.of(new DataType[] {DataTypes.INT()},
new String[] {"pt"});
+ PartitionPredicate mainPredicate =
+ PartitionPredicate.fromMultiple(
+ partitionType,
Collections.singletonList(BinaryRow.singleColumn(1)));
+ PartitionPredicate fallbackPredicate =
+ PartitionPredicate.fromMultiple(
+ partitionType,
Collections.singletonList(BinaryRow.singleColumn(3)));
+
+ // Case 1: both predicates set, pt=1 from main, pt=3 from fallback
+ assertThat(
+ readAndCollect(
+ table,
+ scan ->
scan.withPartitionFilter(mainPredicate, fallbackPredicate)))
+ .containsExactlyInAnyOrder(Pair.of(1, 10), Pair.of(3, 300));
+
+ // Case 2: main predicate is null, fallback predicate set
+ assertThat(readAndCollect(table, scan ->
scan.withPartitionFilter(null, fallbackPredicate)))
+ .containsExactlyInAnyOrder(Pair.of(1, 10), Pair.of(2, 20),
Pair.of(3, 300));
+
+ // Case 3: main predicate set, fallback predicate is null
+ assertThat(readAndCollect(table, scan ->
scan.withPartitionFilter(mainPredicate, null)))
+ .containsExactlyInAnyOrder(
+ Pair.of(1, 10), Pair.of(2, 200), Pair.of(3, 300),
Pair.of(4, 400));
+
+ // Case 4: both null
+ assertThat(readAndCollect(table, scan ->
scan.withPartitionFilter(null, null)))
+ .containsExactlyInAnyOrder(
+ Pair.of(1, 10), Pair.of(2, 20), Pair.of(3, 300),
Pair.of(4, 400));
+ }
+
+ private List<Pair<Integer, Integer>> readAndCollect(
+ FallbackReadFileStoreTable table,
+ Consumer<FallbackReadFileStoreTable.FallbackReadScan> consumer)
+ throws Exception {
+ FallbackReadFileStoreTable.FallbackReadScan scan =
+ (FallbackReadFileStoreTable.FallbackReadScan) table.newScan();
+ consumer.accept(scan);
+ List<Pair<Integer, Integer>> result = new ArrayList<>();
+ for (Split split : scan.plan().splits()) {
+ RecordReader<InternalRow> reader =
table.newRead().createReader(split);
+ reader.forEachRemaining(r -> result.add(Pair.of(r.getInt(0),
r.getInt(1))));
+ reader.close();
+ }
+ return result;
+ }
+
@Test
void testSwitchToBranch() throws Exception {
String branchName = "bc";