JingGe commented on code in PR #3816:
URL: https://github.com/apache/paimon/pull/3816#discussion_r1693678021
##########
docs/content/maintenance/manage-branches.md:
##########
@@ -159,3 +159,72 @@ Run the following command:
{{< /tab >}}
{{< /tabs >}}
+
+### Batch Reading from Fallback Branch
+
+You can set the table option `scan.fallback-branch`
+so that when a batch job reads from the current branch, if a partition does
not exist,
+the reader will try to read this partition from the fallback branch.
+For streaming read jobs, this feature is currently not supported, and will
only produce results from the current branch.
+
+What's the use case of this feature? Say you have created a Paimon table
partitioned by date.
+You have a long-running streaming job which inserts records into Paimon, so
that today's data can be queried in time.
+You also have a batch job which runs at every night to insert corrected
records of yesterday into Paimon,
+so that the preciseness of the data can be promised.
+
+When you query from this Paimon table, you would like to first read from the
results of batch job.
+But if a partition (for example, today's partition) does not exist in its
result,
+then you would like to read from the results of streaming job.
+In this case, you can create a branch for streaming job, and set
`scan.fallback-branch` to this streaming branch.
+
+Let's look at an example.
+
+{{< tabs "read-fallback-branch" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+-- create Paimon table
+CREATE TABLE T (
+ dt STRING NOT NULL,
+ name STRING NOT NULL,
+ amount BIGINT
+) PARTITIONED BY (dt);
+
+-- create a branch for streaming job
+-- you can even specify primary keys and bucket number, even if the original
branch has no primary key
+-- in this example, we create a new branch `test`, uses `dt` and `name` as
primary keys, has a bucket number of 2, and will copy the table options from
the original branch
+CALL sys.create_branch('default.T', 'test', 'dt, name', 2, true);
+
+-- set fallback branch
+ALTER TABLE T SET (
+ 'scan.fallback-branch' = 'test'
+);
+
+-- set changelog producer for the streaming branch, in case a streaming job
would like to read from it in the future
+ALTER TABLE `T$branch_test` SET (
+ 'changelog-producer' = 'lookup'
+);
+
+-- write records into the streaming branch
+INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), ('20240725',
'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6);
+
+-- write records into the default branch
+INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 'banana', 7);
+
+SELECT * FROM T;
+/*
++------------------+------------------+--------+
+| dt | name | amount |
++------------------+------------------+--------+
+| 20240725 | apple | 5 |
Review Comment:
better first run the query without setting `scan.fallback-branch` and return
the two rows of 20240725, then set `'scan.fallback-branch' = 'test'` and run
the query again to return the result of 4 rows with the data from the fallback
branch. WDYT?
##########
paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java:
##########
@@ -89,6 +91,27 @@ public static FileStoreTable create(
fileIO, tablePath, tableSchema,
catalogEnvironment)
: new PrimaryKeyFileStoreTable(
fileIO, tablePath, tableSchema,
catalogEnvironment);
- return table.copy(dynamicOptions.toMap());
+ table = table.copy(dynamicOptions.toMap());
+
+ Options options = new Options(table.options());
+ String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH);
+ if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
+ Options branchOptions = new Options();
+ branchOptions.set(CoreOptions.BRANCH, fallbackBranch);
+ branchOptions.set(CoreOptions.SCAN_FALLBACK_BRANCH, "");
+ FileStoreTable fallbackTable =
+ FileStoreTableFactory.create(
+ fileIO,
+ tablePath,
+ new SchemaManager(fileIO, tablePath,
fallbackBranch).latest().get(),
+ branchOptions,
+ catalogEnvironment);
+
+ Preconditions.checkArgument(!(table instanceof
FallbackReadFileStoreTable));
Review Comment:
Does it make sense to move the two Preconditions checks to the constructor
of `FallbackReadFileStoreTable` ?
##########
docs/content/maintenance/manage-branches.md:
##########
@@ -159,3 +159,72 @@ Run the following command:
{{< /tab >}}
{{< /tabs >}}
+
+### Batch Reading from Fallback Branch
+
+You can set the table option `scan.fallback-branch`
+so that when a batch job reads from the current branch, if a partition does
not exist,
+the reader will try to read this partition from the fallback branch.
+For streaming read jobs, this feature is currently not supported, and will
only produce results from the current branch.
+
+What's the use case of this feature? Say you have created a Paimon table
partitioned by date.
+You have a long-running streaming job which inserts records into Paimon, so
that today's data can be queried in time.
+You also have a batch job which runs at every night to insert corrected
records of yesterday into Paimon,
+so that the preciseness of the data can be promised.
+
+When you query from this Paimon table, you would like to first read from the
results of batch job.
+But if a partition (for example, today's partition) does not exist in its
result,
+then you would like to read from the results of streaming job.
+In this case, you can create a branch for streaming job, and set
`scan.fallback-branch` to this streaming branch.
+
+Let's look at an example.
+
+{{< tabs "read-fallback-branch" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+-- create Paimon table
+CREATE TABLE T (
+ dt STRING NOT NULL,
+ name STRING NOT NULL,
+ amount BIGINT
+) PARTITIONED BY (dt);
+
+-- create a branch for streaming job
+-- you can even specify primary keys and bucket number, even if the original
branch has no primary key
+-- in this example, we create a new branch `test`, uses `dt` and `name` as
primary keys, has a bucket number of 2, and will copy the table options from
the original branch
+CALL sys.create_branch('default.T', 'test', 'dt, name', 2, true);
+
+-- set fallback branch
+ALTER TABLE T SET (
+ 'scan.fallback-branch' = 'test'
Review Comment:
How to reset it? 'scan.fallback-branch' = null ?
##########
docs/content/maintenance/manage-branches.md:
##########
@@ -159,3 +159,72 @@ Run the following command:
{{< /tab >}}
{{< /tabs >}}
+
+### Batch Reading from Fallback Branch
+
+You can set the table option `scan.fallback-branch`
+so that when a batch job reads from the current branch, if a partition does
not exist,
+the reader will try to read this partition from the fallback branch.
+For streaming read jobs, this feature is currently not supported, and will
only produce results from the current branch.
Review Comment:
What would be the expected behaviour if the feature for the streaming read
had correctly implemented?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]