wuchong commented on code in PR #2477:
URL: https://github.com/apache/fluss/pull/2477#discussion_r2778783386
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/ChangelogVirtualTableITCase.java:
##########
@@ -377,6 +377,25 @@ public void testChangelogWithScanStartupMode() throws
Exception {
.containsExactlyInAnyOrder(
"+I[+I, 3, 1970-01-01T00:00:00.200Z, 4, v4]",
"+I[+I, 4, 1970-01-01T00:00:00.200Z, 5, v5]");
+
+ // 3. Test scan.startup.mode='latest' - should only read new records
after subscription
+ String optionsLatest = " /*+ OPTIONS('scan.startup.mode' = 'latest')
*/";
+ String queryLatest = "SELECT id FROM startup_mode_test$changelog" +
optionsLatest;
+ CloseableIterator<Row> rowIterLatest =
tEnv.executeSql(queryLatest).collect();
+ List<String> latestResults = new ArrayList<>();
+ for (int attempt = 0; attempt < 10; attempt++) {
+ // Write a new record (with id larger than 5)
+ int rowId = 6 + attempt;
+ writeRows(conn, tablePath, Arrays.asList(row(rowId, "v" + rowId)),
false);
+
+ // Try to fetch one record with a 5-second timeout
+ latestResults = collectRowsWithTimeout(rowIterLatest, 1,
Duration.ofSeconds(5));
Review Comment:
I’ve thought about this again, and a more elegant approach might be to
leverage **Flink’s savepoint mechanism** to explicitly force the initialization
of the start offset. Here’s the refined idea:
1. **Pre-populate** the source table with some existing records.
2. Launch a job:
```sql
INSERT INTO sink SELECT * FROM source /*+ OPTIONS('scan.startup.mode' =
'latest') */;
```
3. **Stop the job with a savepoint** (e.g., using
`FlinkOperator#stopWithSavepoint`, as shown in [this
example](https://github.com/apache/fluss/blob/main/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceFailOverITCase.java#L243)).
→ At this point, the “latest” offset has been **determined, initialized,
and checkpointed** into the state.
4. **Restart the job from the savepoint**. (see example in
`FlinkTableSourceFailOverITCase#initTableEnvironment`)
5. **Write new records** into the source table.
6. **Read from the sink table** — the output should contain **only the
records written in step 5**, confirming that the job correctly started reading
from the offset captured at savepoint time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]