JingGe commented on code in PR #3816:
URL: https://github.com/apache/paimon/pull/3816#discussion_r1693678021


##########
docs/content/maintenance/manage-branches.md:
##########
@@ -159,3 +159,72 @@ Run the following command:
 {{< /tab >}}
 
 {{< /tabs >}}
+
+### Batch Reading from Fallback Branch
+
+You can set the table option `scan.fallback-branch`
+so that when a batch job reads from the current branch, if a partition does 
not exist,
+the reader will try to read this partition from the fallback branch.
+For streaming read jobs, this feature is currently not supported, and will 
only produce results from the current branch.
+
+What's the use case of this feature? Say you have created a Paimon table 
partitioned by date.
+You have a long-running streaming job which inserts records into Paimon, so 
that today's data can be queried in time.
+You also have a batch job which runs at every night to insert corrected 
records of yesterday into Paimon,
+so that the preciseness of the data can be promised.
+
+When you query from this Paimon table, you would like to first read from the 
results of batch job.
+But if a partition (for example, today's partition) does not exist in its 
result,
+then you would like to read from the results of streaming job.
+In this case, you can create a branch for streaming job, and set 
`scan.fallback-branch` to this streaming branch.
+
+Let's look at an example.
+
+{{< tabs "read-fallback-branch" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+-- create Paimon table
+CREATE TABLE T (
+    dt STRING NOT NULL,
+    name STRING NOT NULL,
+    amount BIGINT
+) PARTITIONED BY (dt);
+
+-- create a branch for streaming job
+-- you can even specify primary keys and bucket number, even if the original 
branch has no primary key
+-- in this example, we create a new branch `test`, uses `dt` and `name` as 
primary keys, has a bucket number of 2, and will copy the table options from 
the original branch
+CALL sys.create_branch('default.T', 'test', 'dt, name', 2, true);
+
+-- set fallback branch
+ALTER TABLE T SET (
+    'scan.fallback-branch' = 'test'
+);
+
+-- set changelog producer for the streaming branch, in case a streaming job 
would like to read from it in the future
+ALTER TABLE `T$branch_test` SET (
+    'changelog-producer' = 'lookup'
+);
+
+-- write records into the streaming branch
+INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725', 
'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6);
+
+-- write records into the default branch
+INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7);
+
+SELECT * FROM T;
+/*
++------------------+------------------+--------+
+|               dt |             name | amount |
++------------------+------------------+--------+
+|         20240725 |            apple |      5 |

Review Comment:
   better first run the query without setting `scan.fallback-branch` and return 
the two rows of 20240725, then set `'scan.fallback-branch' = 'test'` and run 
the query again to return the result of 4 rows with the data from the fallback 
branch. WDYT?



##########
paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java:
##########
@@ -89,6 +91,27 @@ public static FileStoreTable create(
                                 fileIO, tablePath, tableSchema, 
catalogEnvironment)
                         : new PrimaryKeyFileStoreTable(
                                 fileIO, tablePath, tableSchema, 
catalogEnvironment);
-        return table.copy(dynamicOptions.toMap());
+        table = table.copy(dynamicOptions.toMap());
+
+        Options options = new Options(table.options());
+        String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH);
+        if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
+            Options branchOptions = new Options();
+            branchOptions.set(CoreOptions.BRANCH, fallbackBranch);
+            branchOptions.set(CoreOptions.SCAN_FALLBACK_BRANCH, "");
+            FileStoreTable fallbackTable =
+                    FileStoreTableFactory.create(
+                            fileIO,
+                            tablePath,
+                            new SchemaManager(fileIO, tablePath, 
fallbackBranch).latest().get(),
+                            branchOptions,
+                            catalogEnvironment);
+
+            Preconditions.checkArgument(!(table instanceof 
FallbackReadFileStoreTable));

Review Comment:
   Does it make sense to move the two Preconditions checks to the constructor 
of `FallbackReadFileStoreTable` ?



##########
docs/content/maintenance/manage-branches.md:
##########
@@ -159,3 +159,72 @@ Run the following command:
 {{< /tab >}}
 
 {{< /tabs >}}
+
+### Batch Reading from Fallback Branch
+
+You can set the table option `scan.fallback-branch`
+so that when a batch job reads from the current branch, if a partition does 
not exist,
+the reader will try to read this partition from the fallback branch.
+For streaming read jobs, this feature is currently not supported, and will 
only produce results from the current branch.
+
+What's the use case of this feature? Say you have created a Paimon table 
partitioned by date.
+You have a long-running streaming job which inserts records into Paimon, so 
that today's data can be queried in time.
+You also have a batch job which runs at every night to insert corrected 
records of yesterday into Paimon,
+so that the preciseness of the data can be promised.
+
+When you query from this Paimon table, you would like to first read from the 
results of batch job.
+But if a partition (for example, today's partition) does not exist in its 
result,
+then you would like to read from the results of streaming job.
+In this case, you can create a branch for streaming job, and set 
`scan.fallback-branch` to this streaming branch.
+
+Let's look at an example.
+
+{{< tabs "read-fallback-branch" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+-- create Paimon table
+CREATE TABLE T (
+    dt STRING NOT NULL,
+    name STRING NOT NULL,
+    amount BIGINT
+) PARTITIONED BY (dt);
+
+-- create a branch for streaming job
+-- you can even specify primary keys and bucket number, even if the original 
branch has no primary key
+-- in this example, we create a new branch `test`, uses `dt` and `name` as 
primary keys, has a bucket number of 2, and will copy the table options from 
the original branch
+CALL sys.create_branch('default.T', 'test', 'dt, name', 2, true);
+
+-- set fallback branch
+ALTER TABLE T SET (
+    'scan.fallback-branch' = 'test'

Review Comment:
   How to reset it? 'scan.fallback-branch' = null ?



##########
docs/content/maintenance/manage-branches.md:
##########
@@ -159,3 +159,72 @@ Run the following command:
 {{< /tab >}}
 
 {{< /tabs >}}
+
+### Batch Reading from Fallback Branch
+
+You can set the table option `scan.fallback-branch`
+so that when a batch job reads from the current branch, if a partition does 
not exist,
+the reader will try to read this partition from the fallback branch.
+For streaming read jobs, this feature is currently not supported, and will 
only produce results from the current branch.

Review Comment:
   What would be the expected behaviour if the feature for the streaming read 
had correctly implemented?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to