This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e917af004 [core] Support reading partition from fallback branch when 
not found in current branch (#3816)
e917af004 is described below

commit e917af004d76366080e3ca014d49f683386aad98
Author: tsreaper <[email protected]>
AuthorDate: Wed Jul 31 14:36:37 2024 +0800

    [core] Support reading partition from fallback branch when not found in 
current branch (#3816)
---
 docs/content/maintenance/manage-branches.md        |  83 ++++++
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |   8 +
 .../paimon/privilege/PrivilegedFileStoreTable.java |  74 +----
 .../DelegatedFileStoreTable.java}                  |  80 +----
 .../paimon/table/FallbackReadFileStoreTable.java   | 325 +++++++++++++++++++++
 .../apache/paimon/table/FileStoreTableFactory.java |  29 ++
 .../main/java/org/apache/paimon/table/Table.java   |   2 +-
 .../paimon/table/source/AbstractDataTableScan.java |   6 +
 .../apache/paimon/table/source/InnerTableScan.java |   6 +
 .../table/source/snapshot/SnapshotReader.java      |   2 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |   6 +
 .../apache/paimon/table/system/AuditLogTable.java  |  12 +
 .../org/apache/paimon/flink/BranchSqlITCase.java   |  76 ++++-
 14 files changed, 570 insertions(+), 145 deletions(-)

diff --git a/docs/content/maintenance/manage-branches.md 
b/docs/content/maintenance/manage-branches.md
index 4343214f2..22ca9b850 100644
--- a/docs/content/maintenance/manage-branches.md
+++ b/docs/content/maintenance/manage-branches.md
@@ -159,3 +159,86 @@ Run the following command:
 {{< /tab >}}
 
 {{< /tabs >}}
+
+### Batch Reading from Fallback Branch
+
+You can set the table option `scan.fallback-branch`
+so that when a batch job reads from the current branch, if a partition does 
not exist,
+the reader will try to read this partition from the fallback branch.
+For streaming read jobs, this feature is currently not supported, and will 
only produce results from the current branch.
+
+What's the use case of this feature? Say you have created a Paimon table 
partitioned by date.
+You have a long-running streaming job which inserts records into Paimon, so 
that today's data can be queried in time.
+You also have a batch job which runs at every night to insert corrected 
records of yesterday into Paimon,
+so that the preciseness of the data can be promised.
+
+When you query from this Paimon table, you would like to first read from the 
results of batch job.
+But if a partition (for example, today's partition) does not exist in its 
result,
+then you would like to read from the results of streaming job.
+In this case, you can create a branch for streaming job, and set 
`scan.fallback-branch` to this streaming branch.
+
+Let's look at an example.
+
+{{< tabs "read-fallback-branch" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+-- create Paimon table
+CREATE TABLE T (
+    dt STRING NOT NULL,
+    name STRING NOT NULL,
+    amount BIGINT
+) PARTITIONED BY (dt);
+
+-- create a branch for streaming job
+CALL sys.create_branch('default.T', 'test');
+
+-- set primary key and bucket number for the branch
+ALTER TABLE `T$branch_test` SET (
+    'primary-key' = 'dt,name',
+    'bucket' = '2',
+    'changelog-producer' = 'lookup'
+);
+
+-- set fallback branch
+ALTER TABLE T SET (
+    'scan.fallback-branch' = 'test'
+);
+
+-- write records into the streaming branch
+INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725', 
'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6);
+
+-- write records into the default branch
+INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7);
+
+SELECT * FROM T;
+/*
++------------------+------------------+--------+
+|               dt |             name | amount |
++------------------+------------------+--------+
+|         20240725 |            apple |      5 |
+|         20240725 |           banana |      7 |
+|         20240726 |           cherry |      3 |
+|         20240726 |             pear |      6 |
++------------------+------------------+--------+
+*/
+
+-- reset fallback branch
+ALTER TABLE T RESET ( 'scan.fallback-branch' );
+
+-- now it only reads from default branch
+SELECT * FROM T;
+/*
++------------------+------------------+--------+
+|               dt |             name | amount |
++------------------+------------------+--------+
+|         20240725 |            apple |      5 |
+|         20240725 |           banana |      7 |
++------------------+------------------+--------+
+*/
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index d6eb50235..446931aca 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -581,6 +581,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Long</td>
             <td>End condition "watermark" for bounded streaming mode. Stream 
reading will end when a larger watermark snapshot is encountered.</td>
         </tr>
+        <tr>
+            <td><h5>scan.fallback-branch</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>When a batch job queries from a table, if a partition does not 
exist in the current branch, the reader will try to get this partition from 
this fallback branch.</td>
+        </tr>
         <tr>
             <td><h5>scan.file-creation-time-millis</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 685634e26..17bff3653 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1275,6 +1275,14 @@ public class CoreOptions implements Serializable {
                             "The maximum number of concurrent deleting files. "
                                     + "By default is the number of processors 
available to the Java virtual machine.");
 
+    public static final ConfigOption<String> SCAN_FALLBACK_BRANCH =
+            key("scan.fallback-branch")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "When a batch job queries from a table, if a 
partition does not exist in the current branch, "
+                                    + "the reader will try to get this 
partition from this fallback branch.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
index 45ce6f0bf..d590eb370 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
@@ -18,20 +18,15 @@
 
 package org.apache.paimon.privilege;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.Statistics;
-import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.DelegatedFileStoreTable;
 import org.apache.paimon.table.ExpireSnapshots;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.query.LocalTableQuery;
-import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.sink.WriteSelector;
@@ -40,55 +35,32 @@ import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.StreamDataTableScan;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.utils.BranchManager;
-import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
 import java.time.Duration;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.OptionalLong;
 
 /** {@link FileStoreTable} with privilege checks. */
-public class PrivilegedFileStoreTable implements FileStoreTable {
+public class PrivilegedFileStoreTable extends DelegatedFileStoreTable {
 
-    private final FileStoreTable wrapped;
     private final PrivilegeChecker privilegeChecker;
     private final Identifier identifier;
 
     public PrivilegedFileStoreTable(
             FileStoreTable wrapped, PrivilegeChecker privilegeChecker, 
Identifier identifier) {
-        this.wrapped = wrapped;
+        super(wrapped);
         this.privilegeChecker = privilegeChecker;
         this.identifier = identifier;
     }
 
-    @Override
-    public String name() {
-        return wrapped.name();
-    }
-
-    @Override
-    public String fullName() {
-        return wrapped.fullName();
-    }
-
     @Override
     public SnapshotReader newSnapshotReader() {
         privilegeChecker.assertCanSelect(identifier);
         return wrapped.newSnapshotReader();
     }
 
-    @Override
-    public CoreOptions coreOptions() {
-        return wrapped.coreOptions();
-    }
-
-    @Override
-    public SnapshotManager snapshotManager() {
-        return wrapped.snapshotManager();
-    }
-
     @Override
     public TagManager tagManager() {
         privilegeChecker.assertCanInsert(identifier);
@@ -102,36 +74,11 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
         return wrapped.branchManager();
     }
 
-    @Override
-    public Path location() {
-        return wrapped.location();
-    }
-
-    @Override
-    public FileIO fileIO() {
-        return wrapped.fileIO();
-    }
-
-    @Override
-    public TableSchema schema() {
-        return wrapped.schema();
-    }
-
     @Override
     public FileStore<?> store() {
         return new PrivilegedFileStore<>(wrapped.store(), privilegeChecker, 
identifier);
     }
 
-    @Override
-    public BucketMode bucketMode() {
-        return wrapped.bucketMode();
-    }
-
-    @Override
-    public CatalogEnvironment catalogEnvironment() {
-        return wrapped.catalogEnvironment();
-    }
-
     @Override
     public Optional<Statistics> statistics() {
         privilegeChecker.assertCanSelect(identifier);
@@ -144,11 +91,6 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
                 wrapped.copy(dynamicOptions), privilegeChecker, identifier);
     }
 
-    @Override
-    public OptionalLong latestSnapshotId() {
-        return wrapped.latestSnapshotId();
-    }
-
     @Override
     public FileStoreTable copy(TableSchema newTableSchema) {
         return new PrivilegedFileStoreTable(
@@ -299,16 +241,6 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
         return wrapped.newLocalTableQuery();
     }
 
-    @Override
-    public boolean supportStreamingReadOverwrite() {
-        return wrapped.supportStreamingReadOverwrite();
-    }
-
-    @Override
-    public RowKeyExtractor createRowKeyExtractor() {
-        return wrapped.createRowKeyExtractor();
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
similarity index 65%
copy from 
paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 45ce6f0bf..6a8e94240 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -16,20 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.privilege;
+package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
-import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.Statistics;
-import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.CatalogEnvironment;
-import org.apache.paimon.table.ExpireSnapshots;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.query.LocalTableQuery;
 import org.apache.paimon.table.sink.RowKeyExtractor;
 import org.apache.paimon.table.sink.TableCommitImpl;
@@ -44,23 +39,17 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
 
 import java.time.Duration;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalLong;
 
-/** {@link FileStoreTable} with privilege checks. */
-public class PrivilegedFileStoreTable implements FileStoreTable {
+/** Delegated {@link FileStoreTable}. */
+public abstract class DelegatedFileStoreTable implements FileStoreTable {
 
-    private final FileStoreTable wrapped;
-    private final PrivilegeChecker privilegeChecker;
-    private final Identifier identifier;
+    protected final FileStoreTable wrapped;
 
-    public PrivilegedFileStoreTable(
-            FileStoreTable wrapped, PrivilegeChecker privilegeChecker, 
Identifier identifier) {
+    public DelegatedFileStoreTable(FileStoreTable wrapped) {
         this.wrapped = wrapped;
-        this.privilegeChecker = privilegeChecker;
-        this.identifier = identifier;
     }
 
     @Override
@@ -75,7 +64,6 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
 
     @Override
     public SnapshotReader newSnapshotReader() {
-        privilegeChecker.assertCanSelect(identifier);
         return wrapped.newSnapshotReader();
     }
 
@@ -91,14 +79,11 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
 
     @Override
     public TagManager tagManager() {
-        privilegeChecker.assertCanInsert(identifier);
         return wrapped.tagManager();
     }
 
     @Override
     public BranchManager branchManager() {
-        privilegeChecker.assertCanSelect(identifier);
-        privilegeChecker.assertCanInsert(identifier);
         return wrapped.branchManager();
     }
 
@@ -119,7 +104,7 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
 
     @Override
     public FileStore<?> store() {
-        return new PrivilegedFileStore<>(wrapped.store(), privilegeChecker, 
identifier);
+        return wrapped.store();
     }
 
     @Override
@@ -134,168 +119,121 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
 
     @Override
     public Optional<Statistics> statistics() {
-        privilegeChecker.assertCanSelect(identifier);
         return wrapped.statistics();
     }
 
-    @Override
-    public FileStoreTable copy(Map<String, String> dynamicOptions) {
-        return new PrivilegedFileStoreTable(
-                wrapped.copy(dynamicOptions), privilegeChecker, identifier);
-    }
-
     @Override
     public OptionalLong latestSnapshotId() {
         return wrapped.latestSnapshotId();
     }
 
-    @Override
-    public FileStoreTable copy(TableSchema newTableSchema) {
-        return new PrivilegedFileStoreTable(
-                wrapped.copy(newTableSchema), privilegeChecker, identifier);
-    }
-
     @Override
     public void rollbackTo(long snapshotId) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.rollbackTo(snapshotId);
     }
 
     @Override
     public void createTag(String tagName) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.createTag(tagName);
     }
 
     @Override
     public void createTag(String tagName, long fromSnapshotId) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.createTag(tagName, fromSnapshotId);
     }
 
     @Override
     public void createTag(String tagName, Duration timeRetained) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.createTag(tagName, timeRetained);
     }
 
     @Override
     public void createTag(String tagName, long fromSnapshotId, Duration 
timeRetained) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.createTag(tagName, fromSnapshotId, timeRetained);
     }
 
     @Override
     public void deleteTag(String tagName) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.deleteTag(tagName);
     }
 
     @Override
     public void rollbackTo(String tagName) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.rollbackTo(tagName);
     }
 
     @Override
     public void createBranch(String branchName) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.createBranch(branchName);
     }
 
     @Override
     public void createBranch(String branchName, long snapshotId) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.createBranch(branchName, snapshotId);
     }
 
     @Override
     public void createBranch(String branchName, String tagName) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.createBranch(branchName, tagName);
     }
 
     @Override
     public void deleteBranch(String branchName) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.deleteBranch(branchName);
     }
 
     @Override
     public void fastForward(String branchName) {
-        privilegeChecker.assertCanInsert(identifier);
         wrapped.fastForward(branchName);
     }
 
     @Override
     public ExpireSnapshots newExpireSnapshots() {
-        privilegeChecker.assertCanInsert(identifier);
         return wrapped.newExpireSnapshots();
     }
 
     @Override
     public ExpireSnapshots newExpireChangelog() {
-        privilegeChecker.assertCanInsert(identifier);
         return wrapped.newExpireChangelog();
     }
 
-    @Override
-    public FileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
-        return new PrivilegedFileStoreTable(
-                wrapped.copyWithoutTimeTravel(dynamicOptions), 
privilegeChecker, identifier);
-    }
-
-    @Override
-    public FileStoreTable copyWithLatestSchema() {
-        return new PrivilegedFileStoreTable(
-                wrapped.copyWithLatestSchema(), privilegeChecker, identifier);
-    }
-
     @Override
     public DataTableScan newScan() {
-        privilegeChecker.assertCanSelect(identifier);
         return wrapped.newScan();
     }
 
     @Override
     public StreamDataTableScan newStreamScan() {
-        privilegeChecker.assertCanSelect(identifier);
         return wrapped.newStreamScan();
     }
 
     @Override
     public InnerTableRead newRead() {
-        privilegeChecker.assertCanSelect(identifier);
         return wrapped.newRead();
     }
 
     @Override
     public Optional<WriteSelector> newWriteSelector() {
-        privilegeChecker.assertCanInsert(identifier);
         return wrapped.newWriteSelector();
     }
 
     @Override
     public TableWriteImpl<?> newWrite(String commitUser) {
-        privilegeChecker.assertCanInsert(identifier);
         return wrapped.newWrite(commitUser);
     }
 
     @Override
     public TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter 
manifestFilter) {
-        privilegeChecker.assertCanInsert(identifier);
         return wrapped.newWrite(commitUser, manifestFilter);
     }
 
     @Override
     public TableCommitImpl newCommit(String commitUser) {
-        privilegeChecker.assertCanInsert(identifier);
         return wrapped.newCommit(commitUser);
     }
 
     @Override
     public LocalTableQuery newLocalTableQuery() {
-        privilegeChecker.assertCanSelect(identifier);
         return wrapped.newLocalTableQuery();
     }
 
@@ -317,9 +255,7 @@ public class PrivilegedFileStoreTable implements 
FileStoreTable {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        PrivilegedFileStoreTable that = (PrivilegedFileStoreTable) o;
-        return Objects.equals(wrapped, that.wrapped)
-                && Objects.equals(privilegeChecker, that.privilegeChecker)
-                && Objects.equals(identifier, that.identifier);
+        DelegatedFileStoreTable that = (DelegatedFileStoreTable) o;
+        return Objects.equals(wrapped, that.wrapped);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
new file mode 100644
index 000000000..d26ce955a
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link FileStoreTable} which mainly read from the current branch. 
However, if the current
+ * branch does not have a partition, it will read that partition from the 
fallback branch.
+ */
+public class FallbackReadFileStoreTable extends DelegatedFileStoreTable {
+
+    private final FileStoreTable fallback;
+
+    public FallbackReadFileStoreTable(FileStoreTable main, FileStoreTable 
fallback) {
+        super(main);
+        this.fallback = fallback;
+
+        Preconditions.checkArgument(!(main instanceof 
FallbackReadFileStoreTable));
+        Preconditions.checkArgument(!(fallback instanceof 
FallbackReadFileStoreTable));
+
+        String mainBranch = main.coreOptions().branch();
+        String fallbackBranch = fallback.coreOptions().branch();
+        RowType mainRowType = main.schema().logicalRowType();
+        RowType fallbackRowType = fallback.schema().logicalRowType();
+        Preconditions.checkArgument(
+                mainRowType.equals(fallbackRowType),
+                "Branch %s and %s does not have the same row type.\n"
+                        + "Row type of branch %s is %s.\n"
+                        + "Row type of branch %s is %s.",
+                mainBranch,
+                fallbackBranch,
+                mainBranch,
+                mainRowType,
+                fallbackBranch,
+                fallbackRowType);
+
+        List<String> mainPrimaryKeys = main.schema().primaryKeys();
+        List<String> fallbackPrimaryKeys = fallback.schema().primaryKeys();
+        if (!mainPrimaryKeys.isEmpty()) {
+            if (fallbackPrimaryKeys.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "Branch "
+                                + mainBranch
+                                + " has primary keys while fallback branch "
+                                + fallbackBranch
+                                + " does not. This is not allowed.");
+            }
+            Preconditions.checkArgument(
+                    mainPrimaryKeys.equals(fallbackPrimaryKeys),
+                    "Branch %s and %s both have primary keys but are not the 
same.\n"
+                            + "Primary keys of %s are %s.\n"
+                            + "Primary keys of %s are %s.",
+                    mainBranch,
+                    fallbackBranch,
+                    mainBranch,
+                    mainPrimaryKeys,
+                    fallbackBranch,
+                    fallbackPrimaryKeys);
+        }
+    }
+
+    @Override
+    public FileStoreTable copy(Map<String, String> dynamicOptions) {
+        return new FallbackReadFileStoreTable(
+                wrapped.copy(dynamicOptions),
+                fallback.copy(rewriteFallbackOptions(dynamicOptions)));
+    }
+
+    @Override
+    public FileStoreTable copy(TableSchema newTableSchema) {
+        return new FallbackReadFileStoreTable(
+                wrapped.copy(newTableSchema),
+                fallback.copy(
+                        
newTableSchema.copy(rewriteFallbackOptions(newTableSchema.options()))));
+    }
+
+    @Override
+    public FileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
+        return new FallbackReadFileStoreTable(
+                wrapped.copyWithoutTimeTravel(dynamicOptions),
+                
fallback.copyWithoutTimeTravel(rewriteFallbackOptions(dynamicOptions)));
+    }
+
+    @Override
+    public FileStoreTable copyWithLatestSchema() {
+        return new FallbackReadFileStoreTable(
+                wrapped.copyWithLatestSchema(), 
fallback.copyWithLatestSchema());
+    }
+
+    private Map<String, String> rewriteFallbackOptions(Map<String, String> 
options) {
+        Map<String, String> result = new HashMap<>(options);
+
+        // snapshot ids may be different between the main branch and the 
fallback branch,
+        // so we need to convert main branch snapshot id to millisecond,
+        // then convert millisecond to fallback branch snapshot id
+        String scanSnapshotIdOptionKey = CoreOptions.SCAN_SNAPSHOT_ID.key();
+        if (options.containsKey(scanSnapshotIdOptionKey)) {
+            long id = Long.parseLong(options.get(scanSnapshotIdOptionKey));
+            long millis = wrapped.snapshotManager().snapshot(id).timeMillis();
+            Snapshot fallbackSnapshot = 
fallback.snapshotManager().earlierOrEqualTimeMills(millis);
+            long fallbackId;
+            if (fallbackSnapshot == null) {
+                fallbackId = Snapshot.FIRST_SNAPSHOT_ID;
+            } else {
+                fallbackId = fallbackSnapshot.id();
+            }
+            result.put(scanSnapshotIdOptionKey, String.valueOf(fallbackId));
+        }
+
+        // bucket number of main branch and fallback branch are very likely 
different,
+        // so we remove bucket in options to use fallback branch's bucket 
number
+        result.remove(CoreOptions.BUCKET.key());
+
+        return result;
+    }
+
+    @Override
+    public DataTableScan newScan() {
+        return new Scan();
+    }
+
+    private class Scan implements DataTableScan {
+
+        private final DataTableScan mainScan;
+        private final DataTableScan fallbackScan;
+
+        private Scan() {
+            this.mainScan = wrapped.newScan();
+            this.fallbackScan = fallback.newScan();
+        }
+
+        @Override
+        public Scan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+            mainScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
+            fallbackScan.withShard(indexOfThisSubtask, 
numberOfParallelSubtasks);
+            return this;
+        }
+
+        @Override
+        public Scan withFilter(Predicate predicate) {
+            mainScan.withFilter(predicate);
+            fallbackScan.withFilter(predicate);
+            return this;
+        }
+
+        @Override
+        public Scan withLimit(int limit) {
+            mainScan.withLimit(limit);
+            fallbackScan.withLimit(limit);
+            return this;
+        }
+
+        @Override
+        public Scan withPartitionFilter(Map<String, String> partitionSpec) {
+            mainScan.withPartitionFilter(partitionSpec);
+            fallbackScan.withPartitionFilter(partitionSpec);
+            return this;
+        }
+
+        @Override
+        public Scan withPartitionFilter(List<BinaryRow> partitions) {
+            mainScan.withPartitionFilter(partitions);
+            fallbackScan.withPartitionFilter(partitions);
+            return this;
+        }
+
+        @Override
+        public Scan withBucketFilter(Filter<Integer> bucketFilter) {
+            mainScan.withBucketFilter(bucketFilter);
+            fallbackScan.withBucketFilter(bucketFilter);
+            return this;
+        }
+
+        @Override
+        public Scan withLevelFilter(Filter<Integer> levelFilter) {
+            mainScan.withLevelFilter(levelFilter);
+            fallbackScan.withLevelFilter(levelFilter);
+            return this;
+        }
+
+        @Override
+        public Scan withMetricsRegistry(MetricRegistry metricRegistry) {
+            mainScan.withMetricsRegistry(metricRegistry);
+            fallbackScan.withMetricsRegistry(metricRegistry);
+            return this;
+        }
+
+        @Override
+        public TableScan.Plan plan() {
+            List<DataSplit> splits = new ArrayList<>();
+            Set<BinaryRow> completePartitions = new HashSet<>();
+            for (Split split : mainScan.plan().splits()) {
+                DataSplit dataSplit = (DataSplit) split;
+                splits.add(dataSplit);
+                completePartitions.add(dataSplit.partition());
+            }
+
+            List<BinaryRow> remainingPartitions =
+                    fallbackScan.listPartitions().stream()
+                            .filter(p -> !completePartitions.contains(p))
+                            .collect(Collectors.toList());
+            if (!remainingPartitions.isEmpty()) {
+                fallbackScan.withPartitionFilter(remainingPartitions);
+                for (Split split : fallbackScan.plan().splits()) {
+                    splits.add((DataSplit) split);
+                }
+            }
+            return new DataFilePlan(splits);
+        }
+
+        @Override
+        public List<BinaryRow> listPartitions() {
+            Set<BinaryRow> partitions = new 
LinkedHashSet<>(mainScan.listPartitions());
+            partitions.addAll(fallbackScan.listPartitions());
+            return new ArrayList<>(partitions);
+        }
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return new Read();
+    }
+
+    private class Read implements InnerTableRead {
+
+        private final InnerTableRead mainRead;
+        private final InnerTableRead fallbackRead;
+
+        private Read() {
+            this.mainRead = wrapped.newRead();
+            this.fallbackRead = fallback.newRead();
+        }
+
+        @Override
+        public InnerTableRead withFilter(Predicate predicate) {
+            mainRead.withFilter(predicate);
+            fallbackRead.withFilter(predicate);
+            return this;
+        }
+
+        @Override
+        public InnerTableRead withProjection(int[][] projection) {
+            mainRead.withProjection(projection);
+            fallbackRead.withProjection(projection);
+            return this;
+        }
+
+        @Override
+        public InnerTableRead forceKeepDelete() {
+            mainRead.forceKeepDelete();
+            fallbackRead.forceKeepDelete();
+            return this;
+        }
+
+        @Override
+        public TableRead executeFilter() {
+            mainRead.executeFilter();
+            fallbackRead.executeFilter();
+            return this;
+        }
+
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            mainRead.withIOManager(ioManager);
+            fallbackRead.withIOManager(ioManager);
+            return this;
+        }
+
+        @Override
+        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+            DataSplit dataSplit = (DataSplit) split;
+            if (!dataSplit.dataFiles().isEmpty()
+                    && dataSplit.dataFiles().get(0).minKey().getFieldCount() > 
0) {
+                return fallbackRead.createReader(split);
+            } else {
+                return mainRead.createReader(split);
+            }
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index e124fbb27..58449c9d7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.StringUtils;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -83,6 +84,34 @@ public class FileStoreTableFactory {
             TableSchema tableSchema,
             Options dynamicOptions,
             CatalogEnvironment catalogEnvironment) {
+        FileStoreTable table =
+                createWithoutFallbackBranch(
+                        fileIO, tablePath, tableSchema, dynamicOptions, 
catalogEnvironment);
+
+        Options options = new Options(table.options());
+        String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH);
+        if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
+            Options branchOptions = new Options();
+            branchOptions.set(CoreOptions.BRANCH, fallbackBranch);
+            FileStoreTable fallbackTable =
+                    createWithoutFallbackBranch(
+                            fileIO,
+                            tablePath,
+                            new SchemaManager(fileIO, tablePath, 
fallbackBranch).latest().get(),
+                            branchOptions,
+                            catalogEnvironment);
+            table = new FallbackReadFileStoreTable(table, fallbackTable);
+        }
+
+        return table;
+    }
+
+    private static FileStoreTable createWithoutFallbackBranch(
+            FileIO fileIO,
+            Path tablePath,
+            TableSchema tableSchema,
+            Options dynamicOptions,
+            CatalogEnvironment catalogEnvironment) {
         FileStoreTable table =
                 tableSchema.primaryKeys().isEmpty()
                         ? new AppendOnlyFileStoreTable(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java 
b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
index 708e25c1e..62207f882 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java
@@ -112,7 +112,7 @@ public interface Table extends Serializable {
     @Experimental
     void rollbackTo(String tagName);
 
-    /** Create a empty branch. */
+    /** Create an empty branch. */
     @Experimental
     void createBranch(String branchName);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 426620b51..531c4945b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -85,6 +85,12 @@ public abstract class AbstractDataTableScan implements 
DataTableScan {
         return this;
     }
 
+    @Override
+    public AbstractDataTableScan withPartitionFilter(List<BinaryRow> 
partitions) {
+        snapshotReader.withPartitionFilter(partitions);
+        return this;
+    }
+
     @Override
     public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
         snapshotReader.withLevelFilter(levelFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 613b2efc2..00a4fc0cd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -18,10 +18,12 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.utils.Filter;
 
+import java.util.List;
 import java.util.Map;
 
 /** Inner {@link TableScan} contains filter push down. */
@@ -37,6 +39,10 @@ public interface InnerTableScan extends TableScan {
         return this;
     }
 
+    default InnerTableScan withPartitionFilter(List<BinaryRow> partitions) {
+        return this;
+    }
+
     default InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 8db3effd1..c2439de55 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -56,6 +56,8 @@ public interface SnapshotReader {
 
     SnapshotReader withPartitionFilter(Predicate predicate);
 
+    SnapshotReader withPartitionFilter(List<BinaryRow> partitions);
+
     SnapshotReader withMode(ScanMode scanMode);
 
     SnapshotReader withLevelFilter(Filter<Integer> levelFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 1b58fea91..25c3ffa96 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -157,6 +157,12 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader withPartitionFilter(List<BinaryRow> partitions) {
+        scan.withPartitionFilter(partitions);
+        return this;
+    }
+
     @Override
     public SnapshotReader withFilter(Predicate predicate) {
         List<String> partitionKeys = tableSchema.partitionKeys();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 0b922d77b..7192c3630 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -253,6 +253,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withPartitionFilter(List<BinaryRow> partitions) {
+            snapshotReader.withPartitionFilter(partitions);
+            return this;
+        }
+
         @Override
         public SnapshotReader withMode(ScanMode scanMode) {
             snapshotReader.withMode(scanMode);
@@ -358,6 +364,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public InnerTableScan withPartitionFilter(List<BinaryRow> partitions) {
+            batchScan.withPartitionFilter(partitions);
+            return this;
+        }
+
         @Override
         public InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
             batchScan.withBucketFilter(bucketFilter);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index cddfaf936..80ca03d8c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -32,13 +32,13 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
 
 /** IT cases for table with branches using SQL. */
 public class BranchSqlITCase extends CatalogITCaseBase {
 
     @Test
     public void testAlterBranchTable() throws Exception {
-
         sql(
                 "CREATE TABLE T ("
                         + " pt INT"
@@ -313,6 +313,80 @@ public class BranchSqlITCase extends CatalogITCaseBase {
         checkSnapshots(snapshotManager, 1, 2);
     }
 
+    @Test
+    public void testFallbackBranchBatchRead() throws Exception {
+        sql(
+                "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) 
PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
+        sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')");
+
+        sql("CALL sys.create_branch('default.t', 'pk')");
+        sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket' 
= '2' )");
+        sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
+
+        sql("INSERT INTO `t$branch_pk` VALUES (1, 20, 'cat'), (1, 30, 'dog')");
+        assertThat(collectResult("SELECT v, k FROM t"))
+                .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]");
+        assertThat(collectResult("SELECT v, k FROM `t$branch_pk`"))
+                .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");
+
+        sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'tiger'), (2, 20, 
'wolf')");
+        assertThat(collectResult("SELECT v, k FROM t"))
+                .containsExactlyInAnyOrder(
+                        "+I[apple, 10]", "+I[banana, 20]", "+I[tiger, 10]", 
"+I[wolf, 20]");
+        assertThat(collectResult("SELECT v, k FROM `t$branch_pk`"))
+                .containsExactlyInAnyOrder(
+                        "+I[cat, 20]", "+I[dog, 30]", "+I[tiger, 10]", 
"+I[wolf, 20]");
+        assertThat(collectResult("SELECT v, k FROM t WHERE pt = 1"))
+                .containsExactlyInAnyOrder("+I[apple, 10]", "+I[banana, 20]");
+        assertThat(collectResult("SELECT v, k FROM `t$branch_pk` WHERE pt = 
1"))
+                .containsExactlyInAnyOrder("+I[cat, 20]", "+I[dog, 30]");
+        assertThat(collectResult("SELECT v, k FROM t WHERE pt = 2"))
+                .containsExactlyInAnyOrder("+I[tiger, 10]", "+I[wolf, 20]");
+        assertThat(collectResult("SELECT v, k FROM `t$branch_pk` WHERE pt = 
2"))
+                .containsExactlyInAnyOrder("+I[tiger, 10]", "+I[wolf, 20]");
+
+        sql("INSERT INTO `t$branch_pk` VALUES (2, 10, 'lion')");
+        assertThat(collectResult("SELECT v, k FROM t"))
+                .containsExactlyInAnyOrder(
+                        "+I[apple, 10]", "+I[banana, 20]", "+I[lion, 10]", 
"+I[wolf, 20]");
+        assertThat(collectResult("SELECT v, k FROM `t$branch_pk`"))
+                .containsExactlyInAnyOrder(
+                        "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", 
"+I[wolf, 20]");
+
+        sql("INSERT OVERWRITE t PARTITION (pt = 1) VALUES (10, 'pear'), (20, 
'mango')");
+        assertThat(collectResult("SELECT v, k FROM t"))
+                .containsExactlyInAnyOrder(
+                        "+I[pear, 10]", "+I[mango, 20]", "+I[lion, 10]", 
"+I[wolf, 20]");
+        assertThat(collectResult("SELECT v, k FROM `t$branch_pk`"))
+                .containsExactlyInAnyOrder(
+                        "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", 
"+I[wolf, 20]");
+
+        sql("ALTER TABLE t RESET ( 'scan.fallback-branch' )");
+        assertThat(collectResult("SELECT v, k FROM t"))
+                .containsExactlyInAnyOrder("+I[pear, 10]", "+I[mango, 20]");
+        assertThat(collectResult("SELECT v, k FROM `t$branch_pk`"))
+                .containsExactlyInAnyOrder(
+                        "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", 
"+I[wolf, 20]");
+    }
+
+    @Test
+    public void testDifferentRowTypes() throws Exception {
+        sql(
+                "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) 
PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
+        sql("CALL sys.create_branch('default.t', 'pk')");
+        sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket' 
= '2' )");
+        sql("ALTER TABLE `t$branch_pk` ADD (v2 INT)");
+        sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
+
+        try {
+            sql("INSERT INTO t VALUES (1, 10, 'apple')");
+            fail("Expecting exceptions");
+        } catch (Exception e) {
+            assertThat(e)
+                    .hasMessageContaining("Branch main and pk does not have 
the same row type");
+        }
+    }
+
     private List<String> collectResult(String sql) throws Exception {
         List<String> result = new ArrayList<>();
         try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {


Reply via email to