This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fde87c7073 [core] ro table support fallback branch (#5544)
fde87c7073 is described below
commit fde87c7073aa0dacc36c73bdbf31d7506991ee78
Author: jerry <[email protected]>
AuthorDate: Tue May 13 11:56:51 2025 +0800
[core] ro table support fallback branch (#5544)
---
.../paimon/table/FallbackReadFileStoreTable.java | 31 +++++++++++++---------
.../paimon/table/system/ReadOptimizedTable.java | 10 ++++++-
.../apache/paimon/flink/CatalogTableITCase.java | 30 ++++++++++++++++-----
3 files changed, 51 insertions(+), 20 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 33c7b2c472..07d837bd42 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -76,6 +76,10 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
Preconditions.checkArgument(!(fallback instanceof
FallbackReadFileStoreTable));
}
+ public FileStoreTable fallback() {
+ return fallback;
+ }
+
@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
return new FallbackReadFileStoreTable(
@@ -170,7 +174,7 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
@Override
public DataTableScan newScan() {
validateSchema();
- return new Scan();
+ return new FallbackReadScan(wrapped.newScan(), fallback.newScan());
}
private void validateSchema() {
@@ -229,46 +233,47 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
return true;
}
- private class Scan implements DataTableScan {
+ /** Scan implementation for {@link FallbackReadFileStoreTable}. */
+ public static class FallbackReadScan implements DataTableScan {
private final DataTableScan mainScan;
private final DataTableScan fallbackScan;
- private Scan() {
- this.mainScan = wrapped.newScan();
- this.fallbackScan = fallback.newScan();
+ public FallbackReadScan(DataTableScan mainScan, DataTableScan
fallbackScan) {
+ this.mainScan = mainScan;
+ this.fallbackScan = fallbackScan;
}
@Override
- public Scan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ public FallbackReadScan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
mainScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
fallbackScan.withShard(indexOfThisSubtask,
numberOfParallelSubtasks);
return this;
}
@Override
- public Scan withFilter(Predicate predicate) {
+ public FallbackReadScan withFilter(Predicate predicate) {
mainScan.withFilter(predicate);
fallbackScan.withFilter(predicate);
return this;
}
@Override
- public Scan withLimit(int limit) {
+ public FallbackReadScan withLimit(int limit) {
mainScan.withLimit(limit);
fallbackScan.withLimit(limit);
return this;
}
@Override
- public Scan withPartitionFilter(Map<String, String> partitionSpec) {
+ public FallbackReadScan withPartitionFilter(Map<String, String>
partitionSpec) {
mainScan.withPartitionFilter(partitionSpec);
fallbackScan.withPartitionFilter(partitionSpec);
return this;
}
@Override
- public Scan withPartitionFilter(List<BinaryRow> partitions) {
+ public FallbackReadScan withPartitionFilter(List<BinaryRow>
partitions) {
mainScan.withPartitionFilter(partitions);
fallbackScan.withPartitionFilter(partitions);
return this;
@@ -282,21 +287,21 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
}
@Override
- public Scan withBucketFilter(Filter<Integer> bucketFilter) {
+ public FallbackReadScan withBucketFilter(Filter<Integer> bucketFilter)
{
mainScan.withBucketFilter(bucketFilter);
fallbackScan.withBucketFilter(bucketFilter);
return this;
}
@Override
- public Scan withLevelFilter(Filter<Integer> levelFilter) {
+ public FallbackReadScan withLevelFilter(Filter<Integer> levelFilter) {
mainScan.withLevelFilter(levelFilter);
fallbackScan.withLevelFilter(levelFilter);
return this;
}
@Override
- public Scan withMetricRegistry(MetricRegistry metricRegistry) {
+ public FallbackReadScan withMetricRegistry(MetricRegistry
metricRegistry) {
mainScan.withMetricRegistry(metricRegistry);
fallbackScan.withMetricRegistry(metricRegistry);
return this;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index caa0b133cc..2e6e5beb06 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -28,10 +28,12 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.FallbackReadFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.DataTableStreamScan;
import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.StreamDataTableScan;
@@ -130,7 +132,13 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
}
@Override
- public DataTableBatchScan newScan() {
+ public DataTableScan newScan() {
+ if (wrapped instanceof FallbackReadFileStoreTable) {
+ FallbackReadFileStoreTable table = (FallbackReadFileStoreTable)
wrapped;
+ return (new FallbackReadFileStoreTable.FallbackReadScan(
+ table.wrapped().newScan(),
table.fallback().newScan()))
+ .withLevelFilter(l -> l == coreOptions().numLevels() - 1);
+ }
return new DataTableBatchScan(
wrapped.schema(),
coreOptions(),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 0797322c0d..a9395a0a83 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -1097,11 +1097,25 @@ public class CatalogTableITCase extends
CatalogITCaseBase {
@Test
public void testReadOptimizedTable() {
sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH
('bucket' = '1')");
- innerTestReadOptimizedTable();
+ innerTestReadOptimizedTableAndCheckData("T");
sql("DROP TABLE T");
sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH
('bucket' = '-1')");
- innerTestReadOptimizedTable();
+ innerTestReadOptimizedTableAndCheckData("T");
+ }
+
+ @Test
+ public void testReadOptimizedTableFallBack() {
+ sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH
('bucket' = '1')");
+ sql("CALL sys.create_branch('default.T', 'stream')");
+ sql("ALTER TABLE T SET ('scan.fallback-branch' = 'stream')");
+ innerTestReadOptimizedTableAndCheckData("T$branch_stream");
+
+ sql("DROP TABLE T");
+ sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH
('bucket' = '-1')");
+ sql("CALL sys.create_branch('default.T', 'stream')");
+ sql("ALTER TABLE T SET ('scan.fallback-branch' = 'stream')");
+ innerTestReadOptimizedTableAndCheckData("T$branch_stream");
}
@Test
@@ -1173,21 +1187,25 @@ public class CatalogTableITCase extends
CatalogITCaseBase {
assertThat(row.getField(6)).isNotNull();
}
- private void innerTestReadOptimizedTable() {
+ private void innerTestReadOptimizedTableAndCheckData(String
insertTableName) {
// full compaction will always be performed at the end of batch jobs,
as long as
// full-compaction.delta-commits is set, regardless of its value
sql(
- "INSERT INTO T /*+ OPTIONS('full-compaction.delta-commits' =
'100') */ VALUES (1, 10), (2, 20)");
+ String.format(
+ "INSERT INTO %s /*+
OPTIONS('full-compaction.delta-commits' = '100') */ VALUES (1, 10), (2, 20)",
+ insertTableName));
List<Row> result = sql("SELECT k, v FROM T$ro ORDER BY k");
assertThat(result).containsExactly(Row.of(1, 10), Row.of(2, 20));
// no compaction, so result of ro table does not change
- sql("INSERT INTO T VALUES (1, 11), (3, 30)");
+ sql(String.format("INSERT INTO %s VALUES (1, 11), (3, 30)",
insertTableName));
result = sql("SELECT k, v FROM T$ro ORDER BY k");
assertThat(result).containsExactly(Row.of(1, 10), Row.of(2, 20));
sql(
- "INSERT INTO T /*+ OPTIONS('full-compaction.delta-commits' =
'100') */ VALUES (2, 21), (3, 31)");
+ String.format(
+ "INSERT INTO %s /*+
OPTIONS('full-compaction.delta-commits' = '100') */ VALUES (2, 21), (3, 31)",
+ insertTableName));
result = sql("SELECT k, v FROM T$ro ORDER BY k");
assertThat(result).containsExactly(Row.of(1, 11), Row.of(2, 21),
Row.of(3, 31));
}