This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e917af004 [core] Support reading partition from fallback branch when
not found in current branch (#3816)
e917af004 is described below
commit e917af004d76366080e3ca014d49f683386aad98
Author: tsreaper <[email protected]>
AuthorDate: Wed Jul 31 14:36:37 2024 +0800
[core] Support reading partition from fallback branch when not found in
current branch (#3816)
---
docs/content/maintenance/manage-branches.md | 83 ++++++
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 8 +
.../paimon/privilege/PrivilegedFileStoreTable.java | 74 +----
.../DelegatedFileStoreTable.java} | 80 +----
.../paimon/table/FallbackReadFileStoreTable.java | 325 +++++++++++++++++++++
.../apache/paimon/table/FileStoreTableFactory.java | 29 ++
.../main/java/org/apache/paimon/table/Table.java | 2 +-
.../paimon/table/source/AbstractDataTableScan.java | 6 +
.../apache/paimon/table/source/InnerTableScan.java | 6 +
.../table/source/snapshot/SnapshotReader.java | 2 +
.../table/source/snapshot/SnapshotReaderImpl.java | 6 +
.../apache/paimon/table/system/AuditLogTable.java | 12 +
.../org/apache/paimon/flink/BranchSqlITCase.java | 76 ++++-
14 files changed, 570 insertions(+), 145 deletions(-)
diff --git a/docs/content/maintenance/manage-branches.md
b/docs/content/maintenance/manage-branches.md
index 4343214f2..22ca9b850 100644
--- a/docs/content/maintenance/manage-branches.md
+++ b/docs/content/maintenance/manage-branches.md
@@ -159,3 +159,86 @@ Run the following command:
{{< /tab >}}
{{< /tabs >}}
+
+### Batch Reading from Fallback Branch
+
+You can set the table option `scan.fallback-branch`
+so that when a batch job reads from the current branch, if a partition does
not exist,
+the reader will try to read this partition from the fallback branch.
+For streaming read jobs, this feature is currently not supported, and will
only produce results from the current branch.
+
+What's the use case of this feature? Say you have created a Paimon table
partitioned by date.
+You have a long-running streaming job which inserts records into Paimon, so
that today's data can be queried in time.
+You also have a batch job which runs at every night to insert corrected
records of yesterday into Paimon,
+so that the preciseness of the data can be promised.
+
+When you query from this Paimon table, you would like to first read from the
results of batch job.
+But if a partition (for example, today's partition) does not exist in its
result,
+then you would like to read from the results of streaming job.
+In this case, you can create a branch for streaming job, and set
`scan.fallback-branch` to this streaming branch.
+
+Let's look at an example.
+
+{{< tabs "read-fallback-branch" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+-- create Paimon table
+CREATE TABLE T (
+ dt STRING NOT NULL,
+ name STRING NOT NULL,
+ amount BIGINT
+) PARTITIONED BY (dt);
+
+-- create a branch for streaming job
+CALL sys.create_branch('default.T', 'test');
+
+-- set primary key and bucket number for the branch
+ALTER TABLE `T$branch_test` SET (
+ 'primary-key' = 'dt,name',
+ 'bucket' = '2',
+ 'changelog-producer' = 'lookup'
+);
+
+-- set fallback branch
+ALTER TABLE T SET (
+ 'scan.fallback-branch' = 'test'
+);
+
+-- write records into the streaming branch
+INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725',
'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6);
+
+-- write records into the default branch
+INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7);
+
+SELECT * FROM T;
+/*
++------------------+------------------+--------+
+| dt | name | amount |
++------------------+------------------+--------+
+| 20240725 | apple | 5 |
+| 20240725 | banana | 7 |
+| 20240726 | cherry | 3 |
+| 20240726 | pear | 6 |
++------------------+------------------+--------+
+*/
+
+-- reset fallback branch
+ALTER TABLE T RESET ( 'scan.fallback-branch' );
+
+-- now it only reads from default branch
+SELECT * FROM T;
+/*
++------------------+------------------+--------+
+| dt | name | amount |
++------------------+------------------+--------+
+| 20240725 | apple | 5 |
+| 20240725 | banana | 7 |
++------------------+------------------+--------+
+*/
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index d6eb50235..446931aca 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -581,6 +581,12 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Long</td>
<td>End condition "watermark" for bounded streaming mode. Stream
reading will end when a larger watermark snapshot is encountered.</td>
</tr>
+ <tr>
+ <td><h5>scan.fallback-branch</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>When a batch job queries from a table, if a partition does not
exist in the current branch, the reader will try to get this partition from
this fallback branch.</td>
+ </tr>
<tr>
<td><h5>scan.file-creation-time-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 685634e26..17bff3653 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1275,6 +1275,14 @@ public class CoreOptions implements Serializable {
"The maximum number of concurrent deleting files. "
+ "By default is the number of processors
available to the Java virtual machine.");
+ public static final ConfigOption<String> SCAN_FALLBACK_BRANCH =
+ key("scan.fallback-branch")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When a batch job queries from a table, if a
partition does not exist in the current branch, "
+ + "the reader will try to get this
partition from this fallback branch.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 45ce6f0bf..d590eb370 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -18,20 +18,15 @@
package org.apache.paimon.privilege;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.Statistics;
-import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.DelegatedFileStoreTable;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
-import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.sink.WriteSelector;
@@ -40,55 +35,32 @@ import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.StreamDataTableScan;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.BranchManager;
-import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.OptionalLong;
/** {@link FileStoreTable} with privilege checks. */
-public class PrivilegedFileStoreTable implements FileStoreTable {
+public class PrivilegedFileStoreTable extends DelegatedFileStoreTable {
- private final FileStoreTable wrapped;
private final PrivilegeChecker privilegeChecker;
private final Identifier identifier;
public PrivilegedFileStoreTable(
FileStoreTable wrapped, PrivilegeChecker privilegeChecker,
Identifier identifier) {
- this.wrapped = wrapped;
+ super(wrapped);
this.privilegeChecker = privilegeChecker;
this.identifier = identifier;
}
- @Override
- public String name() {
- return wrapped.name();
- }
-
- @Override
- public String fullName() {
- return wrapped.fullName();
- }
-
@Override
public SnapshotReader newSnapshotReader() {
privilegeChecker.assertCanSelect(identifier);
return wrapped.newSnapshotReader();
}
- @Override
- public CoreOptions coreOptions() {
- return wrapped.coreOptions();
- }
-
- @Override
- public SnapshotManager snapshotManager() {
- return wrapped.snapshotManager();
- }
-
@Override
public TagManager tagManager() {
privilegeChecker.assertCanInsert(identifier);
@@ -102,36 +74,11 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
return wrapped.branchManager();
}
- @Override
- public Path location() {
- return wrapped.location();
- }
-
- @Override
- public FileIO fileIO() {
- return wrapped.fileIO();
- }
-
- @Override
- public TableSchema schema() {
- return wrapped.schema();
- }
-
@Override
public FileStore<?> store() {
return new PrivilegedFileStore<>(wrapped.store(), privilegeChecker,
identifier);
}
- @Override
- public BucketMode bucketMode() {
- return wrapped.bucketMode();
- }
-
- @Override
- public CatalogEnvironment catalogEnvironment() {
- return wrapped.catalogEnvironment();
- }
-
@Override
public Optional<Statistics> statistics() {
privilegeChecker.assertCanSelect(identifier);
@@ -144,11 +91,6 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
wrapped.copy(dynamicOptions), privilegeChecker, identifier);
}
- @Override
- public OptionalLong latestSnapshotId() {
- return wrapped.latestSnapshotId();
- }
-
@Override
public FileStoreTable copy(TableSchema newTableSchema) {
return new PrivilegedFileStoreTable(
@@ -299,16 +241,6 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
return wrapped.newLocalTableQuery();
}
- @Override
- public boolean supportStreamingReadOverwrite() {
- return wrapped.supportStreamingReadOverwrite();
- }
-
- @Override
- public RowKeyExtractor createRowKeyExtractor() {
- return wrapped.createRowKeyExtractor();
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
similarity index 65%
copy from
paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 45ce6f0bf..6a8e94240 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -16,20 +16,15 @@
* limitations under the License.
*/
-package org.apache.paimon.privilege;
+package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.Statistics;
-import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.CatalogEnvironment;
-import org.apache.paimon.table.ExpireSnapshots;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
@@ -44,23 +39,17 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import java.time.Duration;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
-/** {@link FileStoreTable} with privilege checks. */
-public class PrivilegedFileStoreTable implements FileStoreTable {
+/** Delegated {@link FileStoreTable}. */
+public abstract class DelegatedFileStoreTable implements FileStoreTable {
- private final FileStoreTable wrapped;
- private final PrivilegeChecker privilegeChecker;
- private final Identifier identifier;
+ protected final FileStoreTable wrapped;
- public PrivilegedFileStoreTable(
- FileStoreTable wrapped, PrivilegeChecker privilegeChecker,
Identifier identifier) {
+ public DelegatedFileStoreTable(FileStoreTable wrapped) {
this.wrapped = wrapped;
- this.privilegeChecker = privilegeChecker;
- this.identifier = identifier;
}
@Override
@@ -75,7 +64,6 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
@Override
public SnapshotReader newSnapshotReader() {
- privilegeChecker.assertCanSelect(identifier);
return wrapped.newSnapshotReader();
}
@@ -91,14 +79,11 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
@Override
public TagManager tagManager() {
- privilegeChecker.assertCanInsert(identifier);
return wrapped.tagManager();
}
@Override
public BranchManager branchManager() {
- privilegeChecker.assertCanSelect(identifier);
- privilegeChecker.assertCanInsert(identifier);
return wrapped.branchManager();
}
@@ -119,7 +104,7 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
@Override
public FileStore<?> store() {
- return new PrivilegedFileStore<>(wrapped.store(), privilegeChecker,
identifier);
+ return wrapped.store();
}
@Override
@@ -134,168 +119,121 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
@Override
public Optional<Statistics> statistics() {
- privilegeChecker.assertCanSelect(identifier);
return wrapped.statistics();
}
- @Override
- public FileStoreTable copy(Map<String, String> dynamicOptions) {
- return new PrivilegedFileStoreTable(
- wrapped.copy(dynamicOptions), privilegeChecker, identifier);
- }
-
@Override
public OptionalLong latestSnapshotId() {
return wrapped.latestSnapshotId();
}
- @Override
- public FileStoreTable copy(TableSchema newTableSchema) {
- return new PrivilegedFileStoreTable(
- wrapped.copy(newTableSchema), privilegeChecker, identifier);
- }
-
@Override
public void rollbackTo(long snapshotId) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.rollbackTo(snapshotId);
}
@Override
public void createTag(String tagName) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.createTag(tagName);
}
@Override
public void createTag(String tagName, long fromSnapshotId) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.createTag(tagName, fromSnapshotId);
}
@Override
public void createTag(String tagName, Duration timeRetained) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.createTag(tagName, timeRetained);
}
@Override
public void createTag(String tagName, long fromSnapshotId, Duration
timeRetained) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.createTag(tagName, fromSnapshotId, timeRetained);
}
@Override
public void deleteTag(String tagName) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.deleteTag(tagName);
}
@Override
public void rollbackTo(String tagName) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.rollbackTo(tagName);
}
@Override
public void createBranch(String branchName) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.createBranch(branchName);
}
@Override
public void createBranch(String branchName, long snapshotId) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.createBranch(branchName, snapshotId);
}
@Override
public void createBranch(String branchName, String tagName) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.createBranch(branchName, tagName);
}
@Override
public void deleteBranch(String branchName) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.deleteBranch(branchName);
}
@Override
public void fastForward(String branchName) {
- privilegeChecker.assertCanInsert(identifier);
wrapped.fastForward(branchName);
}
@Override
public ExpireSnapshots newExpireSnapshots() {
- privilegeChecker.assertCanInsert(identifier);
return wrapped.newExpireSnapshots();
}
@Override
public ExpireSnapshots newExpireChangelog() {
- privilegeChecker.assertCanInsert(identifier);
return wrapped.newExpireChangelog();
}
- @Override
- public FileStoreTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
- return new PrivilegedFileStoreTable(
- wrapped.copyWithoutTimeTravel(dynamicOptions),
privilegeChecker, identifier);
- }
-
- @Override
- public FileStoreTable copyWithLatestSchema() {
- return new PrivilegedFileStoreTable(
- wrapped.copyWithLatestSchema(), privilegeChecker, identifier);
- }
-
@Override
public DataTableScan newScan() {
- privilegeChecker.assertCanSelect(identifier);
return wrapped.newScan();
}
@Override
public StreamDataTableScan newStreamScan() {
- privilegeChecker.assertCanSelect(identifier);
return wrapped.newStreamScan();
}
@Override
public InnerTableRead newRead() {
- privilegeChecker.assertCanSelect(identifier);
return wrapped.newRead();
}
@Override
public Optional<WriteSelector> newWriteSelector() {
- privilegeChecker.assertCanInsert(identifier);
return wrapped.newWriteSelector();
}
@Override
public TableWriteImpl<?> newWrite(String commitUser) {
- privilegeChecker.assertCanInsert(identifier);
return wrapped.newWrite(commitUser);
}
@Override
public TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter
manifestFilter) {
- privilegeChecker.assertCanInsert(identifier);
return wrapped.newWrite(commitUser, manifestFilter);
}
@Override
public TableCommitImpl newCommit(String commitUser) {
- privilegeChecker.assertCanInsert(identifier);
return wrapped.newCommit(commitUser);
}
@Override
public LocalTableQuery newLocalTableQuery() {
- privilegeChecker.assertCanSelect(identifier);
return wrapped.newLocalTableQuery();
}
@@ -317,9 +255,7 @@ public class PrivilegedFileStoreTable implements
FileStoreTable {
if (o == null || getClass() != o.getClass()) {
return false;
}
- PrivilegedFileStoreTable that = (PrivilegedFileStoreTable) o;
- return Objects.equals(wrapped, that.wrapped)
- && Objects.equals(privilegeChecker, that.privilegeChecker)
- && Objects.equals(identifier, that.identifier);
+ DelegatedFileStoreTable that = (DelegatedFileStoreTable) o;
+ return Objects.equals(wrapped, that.wrapped);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
new file mode 100644
index 000000000..d26ce955a
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link FileStoreTable} which mainly read from the current branch.
However, if the current
+ * branch does not have a partition, it will read that partition from the
fallback branch.
+ */
+public class FallbackReadFileStoreTable extends DelegatedFileStoreTable {
+
+ private final FileStoreTable fallback;
+
+ public FallbackReadFileStoreTable(FileStoreTable main, FileStoreTable
fallback) {
+ super(main);
+ this.fallback = fallback;
+
+ Preconditions.checkArgument(!(main instanceof
FallbackReadFileStoreTable));
+ Preconditions.checkArgument(!(fallback instanceof
FallbackReadFileStoreTable));
+
+ String mainBranch = main.coreOptions().branch();
+ String fallbackBranch = fallback.coreOptions().branch();
+ RowType mainRowType = main.schema().logicalRowType();
+ RowType fallbackRowType = fallback.schema().logicalRowType();
+ Preconditions.checkArgument(
+ mainRowType.equals(fallbackRowType),
+ "Branch %s and %s does not have the same row type.\n"
+ + "Row type of branch %s is %s.\n"
+ + "Row type of branch %s is %s.",
+ mainBranch,
+ fallbackBranch,
+ mainBranch,
+ mainRowType,
+ fallbackBranch,
+ fallbackRowType);
+
+ List<String> mainPrimaryKeys = main.schema().primaryKeys();
+ List<String> fallbackPrimaryKeys = fallback.schema().primaryKeys();
+ if (!mainPrimaryKeys.isEmpty()) {
+ if (fallbackPrimaryKeys.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Branch "
+ + mainBranch
+ + " has primary keys while fallback branch "
+ + fallbackBranch
+ + " does not. This is not allowed.");
+ }
+ Preconditions.checkArgument(
+ mainPrimaryKeys.equals(fallbackPrimaryKeys),
+ "Branch %s and %s both have primary keys but are not the
same.\n"
+ + "Primary keys of %s are %s.\n"
+ + "Primary keys of %s are %s.",
+ mainBranch,
+ fallbackBranch,
+ mainBranch,
+ mainPrimaryKeys,
+ fallbackBranch,
+ fallbackPrimaryKeys);
+ }
+ }
+
+ @Override
+ public FileStoreTable copy(Map<String, String> dynamicOptions) {
+ return new FallbackReadFileStoreTable(
+ wrapped.copy(dynamicOptions),
+ fallback.copy(rewriteFallbackOptions(dynamicOptions)));
+ }
+
+ @Override
+ public FileStoreTable copy(TableSchema newTableSchema) {
+ return new FallbackReadFileStoreTable(
+ wrapped.copy(newTableSchema),
+ fallback.copy(
+
newTableSchema.copy(rewriteFallbackOptions(newTableSchema.options()))));
+ }
+
+ @Override
+ public FileStoreTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
+ return new FallbackReadFileStoreTable(
+ wrapped.copyWithoutTimeTravel(dynamicOptions),
+
fallback.copyWithoutTimeTravel(rewriteFallbackOptions(dynamicOptions)));
+ }
+
+ @Override
+ public FileStoreTable copyWithLatestSchema() {
+ return new FallbackReadFileStoreTable(
+ wrapped.copyWithLatestSchema(),
fallback.copyWithLatestSchema());
+ }
+
+ private Map<String, String> rewriteFallbackOptions(Map<String, String>
options) {
+ Map<String, String> result = new HashMap<>(options);
+
+ // snapshot ids may be different between the main branch and the
fallback branch,
+ // so we need to convert main branch snapshot id to millisecond,
+ // then convert millisecond to fallback branch snapshot id
+ String scanSnapshotIdOptionKey = CoreOptions.SCAN_SNAPSHOT_ID.key();
+ if (options.containsKey(scanSnapshotIdOptionKey)) {
+ long id = Long.parseLong(options.get(scanSnapshotIdOptionKey));
+ long millis = wrapped.snapshotManager().snapshot(id).timeMillis();
+ Snapshot fallbackSnapshot =
fallback.snapshotManager().earlierOrEqualTimeMills(millis);
+ long fallbackId;
+ if (fallbackSnapshot == null) {
+ fallbackId = Snapshot.FIRST_SNAPSHOT_ID;
+ } else {
+ fallbackId = fallbackSnapshot.id();
+ }
+ result.put(scanSnapshotIdOptionKey, String.valueOf(fallbackId));
+ }
+
+ // bucket number of main branch and fallback branch are very likely
different,
+ // so we remove bucket in options to use fallback branch's bucket
number
+ result.remove(CoreOptions.BUCKET.key());
+
+ return result;
+ }
+
+ @Override
+ public DataTableScan newScan() {
+ return new Scan();
+ }
+
+ private class Scan implements DataTableScan {
+
+ private final DataTableScan mainScan;
+ private final DataTableScan fallbackScan;
+
+ private Scan() {
+ this.mainScan = wrapped.newScan();
+ this.fallbackScan = fallback.newScan();
+ }
+
+ @Override
+ public Scan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ mainScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
+ fallbackScan.withShard(indexOfThisSubtask,
numberOfParallelSubtasks);
+ return this;
+ }
+
+ @Override
+ public Scan withFilter(Predicate predicate) {
+ mainScan.withFilter(predicate);
+ fallbackScan.withFilter(predicate);
+ return this;
+ }
+
+ @Override
+ public Scan withLimit(int limit) {
+ mainScan.withLimit(limit);
+ fallbackScan.withLimit(limit);
+ return this;
+ }
+
+ @Override
+ public Scan withPartitionFilter(Map<String, String> partitionSpec) {
+ mainScan.withPartitionFilter(partitionSpec);
+ fallbackScan.withPartitionFilter(partitionSpec);
+ return this;
+ }
+
+ @Override
+ public Scan withPartitionFilter(List<BinaryRow> partitions) {
+ mainScan.withPartitionFilter(partitions);
+ fallbackScan.withPartitionFilter(partitions);
+ return this;
+ }
+
+ @Override
+ public Scan withBucketFilter(Filter<Integer> bucketFilter) {
+ mainScan.withBucketFilter(bucketFilter);
+ fallbackScan.withBucketFilter(bucketFilter);
+ return this;
+ }
+
+ @Override
+ public Scan withLevelFilter(Filter<Integer> levelFilter) {
+ mainScan.withLevelFilter(levelFilter);
+ fallbackScan.withLevelFilter(levelFilter);
+ return this;
+ }
+
+ @Override
+ public Scan withMetricsRegistry(MetricRegistry metricRegistry) {
+ mainScan.withMetricsRegistry(metricRegistry);
+ fallbackScan.withMetricsRegistry(metricRegistry);
+ return this;
+ }
+
+ @Override
+ public TableScan.Plan plan() {
+ List<DataSplit> splits = new ArrayList<>();
+ Set<BinaryRow> completePartitions = new HashSet<>();
+ for (Split split : mainScan.plan().splits()) {
+ DataSplit dataSplit = (DataSplit) split;
+ splits.add(dataSplit);
+ completePartitions.add(dataSplit.partition());
+ }
+
+ List<BinaryRow> remainingPartitions =
+ fallbackScan.listPartitions().stream()
+ .filter(p -> !completePartitions.contains(p))
+ .collect(Collectors.toList());
+ if (!remainingPartitions.isEmpty()) {
+ fallbackScan.withPartitionFilter(remainingPartitions);
+ for (Split split : fallbackScan.plan().splits()) {
+ splits.add((DataSplit) split);
+ }
+ }
+ return new DataFilePlan(splits);
+ }
+
+ @Override
+ public List<BinaryRow> listPartitions() {
+ Set<BinaryRow> partitions = new
LinkedHashSet<>(mainScan.listPartitions());
+ partitions.addAll(fallbackScan.listPartitions());
+ return new ArrayList<>(partitions);
+ }
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return new Read();
+ }
+
+ private class Read implements InnerTableRead {
+
+ private final InnerTableRead mainRead;
+ private final InnerTableRead fallbackRead;
+
+ private Read() {
+ this.mainRead = wrapped.newRead();
+ this.fallbackRead = fallback.newRead();
+ }
+
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ mainRead.withFilter(predicate);
+ fallbackRead.withFilter(predicate);
+ return this;
+ }
+
+ @Override
+ public InnerTableRead withProjection(int[][] projection) {
+ mainRead.withProjection(projection);
+ fallbackRead.withProjection(projection);
+ return this;
+ }
+
+ @Override
+ public InnerTableRead forceKeepDelete() {
+ mainRead.forceKeepDelete();
+ fallbackRead.forceKeepDelete();
+ return this;
+ }
+
+ @Override
+ public TableRead executeFilter() {
+ mainRead.executeFilter();
+ fallbackRead.executeFilter();
+ return this;
+ }
+
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ mainRead.withIOManager(ioManager);
+ fallbackRead.withIOManager(ioManager);
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ DataSplit dataSplit = (DataSplit) split;
+ if (!dataSplit.dataFiles().isEmpty()
+ && dataSplit.dataFiles().get(0).minKey().getFieldCount() >
0) {
+ return fallbackRead.createReader(split);
+ } else {
+ return mainRead.createReader(split);
+ }
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index e124fbb27..58449c9d7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.StringUtils;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -83,6 +84,34 @@ public class FileStoreTableFactory {
TableSchema tableSchema,
Options dynamicOptions,
CatalogEnvironment catalogEnvironment) {
+ FileStoreTable table =
+ createWithoutFallbackBranch(
+ fileIO, tablePath, tableSchema, dynamicOptions,
catalogEnvironment);
+
+ Options options = new Options(table.options());
+ String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH);
+ if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
+ Options branchOptions = new Options();
+ branchOptions.set(CoreOptions.BRANCH, fallbackBranch);
+ FileStoreTable fallbackTable =
+ createWithoutFallbackBranch(
+ fileIO,
+ tablePath,
+ new SchemaManager(fileIO, tablePath,
fallbackBranch).latest().get(),
+ branchOptions,
+ catalogEnvironment);
+ table = new FallbackReadFileStoreTable(table, fallbackTable);
+ }
+
+ return table;
+ }
+
+ private static FileStoreTable createWithoutFallbackBranch(
+ FileIO fileIO,
+ Path tablePath,
+ TableSchema tableSchema,
+ Options dynamicOptions,
+ CatalogEnvironment catalogEnvironment) {
FileStoreTable table =
tableSchema.primaryKeys().isEmpty()
? new AppendOnlyFileStoreTable(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 708e25c1e..62207f882 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -112,7 +112,7 @@ public interface Table extends Serializable {
@Experimental
void rollbackTo(String tagName);
- /** Create a empty branch. */
+ /** Create an empty branch. */
@Experimental
void createBranch(String branchName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 426620b51..531c4945b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -85,6 +85,12 @@ public abstract class AbstractDataTableScan implements
DataTableScan {
return this;
}
+ @Override
+ public AbstractDataTableScan withPartitionFilter(List<BinaryRow>
partitions) {
+ snapshotReader.withPartitionFilter(partitions);
+ return this;
+ }
+
@Override
public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
snapshotReader.withLevelFilter(levelFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 613b2efc2..00a4fc0cd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -18,10 +18,12 @@
package org.apache.paimon.table.source;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.utils.Filter;
+import java.util.List;
import java.util.Map;
/** Inner {@link TableScan} contains filter push down. */
@@ -37,6 +39,10 @@ public interface InnerTableScan extends TableScan {
return this;
}
+ default InnerTableScan withPartitionFilter(List<BinaryRow> partitions) {
+ return this;
+ }
+
default InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 8db3effd1..c2439de55 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -56,6 +56,8 @@ public interface SnapshotReader {
SnapshotReader withPartitionFilter(Predicate predicate);
+ SnapshotReader withPartitionFilter(List<BinaryRow> partitions);
+
SnapshotReader withMode(ScanMode scanMode);
SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 1b58fea91..25c3ffa96 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -157,6 +157,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
return this;
}
+ @Override
+ public SnapshotReader withPartitionFilter(List<BinaryRow> partitions) {
+ scan.withPartitionFilter(partitions);
+ return this;
+ }
+
@Override
public SnapshotReader withFilter(Predicate predicate) {
List<String> partitionKeys = tableSchema.partitionKeys();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 0b922d77b..7192c3630 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -253,6 +253,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public SnapshotReader withPartitionFilter(List<BinaryRow> partitions) {
+ snapshotReader.withPartitionFilter(partitions);
+ return this;
+ }
+
@Override
public SnapshotReader withMode(ScanMode scanMode) {
snapshotReader.withMode(scanMode);
@@ -358,6 +364,12 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return this;
}
+ @Override
+ public InnerTableScan withPartitionFilter(List<BinaryRow> partitions) {
+ batchScan.withPartitionFilter(partitions);
+ return this;
+ }
+
@Override
public InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
batchScan.withBucketFilter(bucketFilter);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index cddfaf936..80ca03d8c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -32,13 +32,13 @@ import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
/** IT cases for table with branches using SQL. */
public class BranchSqlITCase extends CatalogITCaseBase {
@Test
public void testAlterBranchTable() throws Exception {
-
sql(
"CREATE TABLE T ("
+ " pt INT"
@@ -313,6 +313,80 @@ public class BranchSqlITCase extends CatalogITCaseBase {
checkSnapshots(snapshotManager, 1, 2);
}
+ @Test
+ public void testFallbackBranchBatchRead() throws Exception {
+ sql(
+ "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING )
PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
+ sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')");
+
+ sql("CALL sys.create_branch('default.t', 'pk')");
+ sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket'
= '2' )");
+ sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
+
+ sql("INSERT INTO `t$branch_pk` VALUES (1, 20, 'cat'), (1, 30, 'dog')");
+ assertThat(collectResult("SELECT v, k FROM t"))
+ .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]");
+ assertThat(collectResult("SELECT v, k FROM `t$branch_pk`"))
+ .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");
+
+ sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'tiger'), (2, 20,
'wolf')");
+ assertThat(collectResult("SELECT v, k FROM t"))
+ .containsExactlyInAnyOrder(
+ "+I[apple, 10]", "+I[banana, 20]", "+I[tiger, 10]",
"+I[wolf, 20]");
+ assertThat(collectResult("SELECT v, k FROM `t$branch_pk`"))
+ .containsExactlyInAnyOrder(
+ "+I[cat, 20]", "+I[dog, 30]", "+I[tiger, 10]",
"+I[wolf, 20]");
+ assertThat(collectResult("SELECT v, k FROM t WHERE pt = 1"))
+ .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]");
+ assertThat(collectResult("SELECT v, k FROM `t$branch_pk` WHERE pt =
1"))
+ .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");
+ assertThat(collectResult("SELECT v, k FROM t WHERE pt = 2"))
+ .containsExactlyInAnyOrder("+I[tiger, 10]", "+I[wolf, 20]");
+ assertThat(collectResult("SELECT v, k FROM `t$branch_pk` WHERE pt =
2"))
+ .containsExactlyInAnyOrder("+I[tiger, 10]", "+I[wolf, 20]");
+
+ sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'lion')");
+ assertThat(collectResult("SELECT v, k FROM t"))
+ .containsExactlyInAnyOrder(
+ "+I[apple, 10]", "+I[banana, 20]", "+I[lion, 10]",
"+I[wolf, 20]");
+ assertThat(collectResult("SELECT v, k FROM `t$branch_pk`"))
+ .containsExactlyInAnyOrder(
+ "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]",
"+I[wolf, 20]");
+
+ sql("INSERT OVERWRITE t PARTITION (pt = 1) VALUES (10, 'pear'), (20,
'mango')");
+ assertThat(collectResult("SELECT v, k FROM t"))
+ .containsExactlyInAnyOrder(
+ "+I[pear, 10]", "+I[mango, 20]", "+I[lion, 10]",
"+I[wolf, 20]");
+ assertThat(collectResult("SELECT v, k FROM `t$branch_pk`"))
+ .containsExactlyInAnyOrder(
+ "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]",
"+I[wolf, 20]");
+
+ sql("ALTER TABLE t RESET ( 'scan.fallback-branch' )");
+ assertThat(collectResult("SELECT v, k FROM t"))
+ .containsExactlyInAnyOrder("+I[pear, 10]", "+I[mango, 20]");
+ assertThat(collectResult("SELECT v, k FROM `t$branch_pk`"))
+ .containsExactlyInAnyOrder(
+ "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]",
"+I[wolf, 20]");
+ }
+
+ @Test
+ public void testDifferentRowTypes() throws Exception {
+ sql(
+ "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING )
PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
+ sql("CALL sys.create_branch('default.t', 'pk')");
+ sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket'
= '2' )");
+ sql("ALTER TABLE `t$branch_pk` ADD (v2 INT)");
+ sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
+
+ try {
+ sql("INSERT INTO t VALUES (1, 10, 'apple')");
+ fail("Expecting exceptions");
+ } catch (Exception e) {
+ assertThat(e)
+ .hasMessageContaining("Branch main and pk does not have
the same row type");
+ }
+ }
+
private List<String> collectResult(String sql) throws Exception {
List<String> result = new ArrayList<>();
try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {