LiebingYu opened a new issue, #2169: URL: https://github.com/apache/fluss/issues/2169
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Fluss version main (development) ### Please describe the bug 🐞 The original intent of Fluss union read is to jointly read data from both the lake and Fluss, leveraging historical data in the lake to complement Fluss’s real-time data. However, in the current Flink source implementation, if a partition exists only in the lake but has already expired in Fluss, the union read is very likely to miss the data from that partition. After some debugging, I found that this issue only occurs when partition discovery is enabled. We can see some clues from `strat()`. ```java @Override public void start() { .. if (isPartitioned) { if (streaming) { if (lakeSource != null) { // we'll need to consider lake splits List<SourceSplitBase> hybridLakeFlussSplits = generateHybridLakeFlussSplits(); if (hybridLakeFlussSplits != null) { LOG.info("Generated hybrid lake splits: {}", hybridLakeFlussSplits); // In this step, hybrid lake fluss splits will be append to pendingSplitAssignment. // But note that there are no registered readers now, so the hybrid splits won't be actually assigned to readers. handleSplitsAdd(hybridLakeFlussSplits, null); } } if (scanPartitionDiscoveryIntervalMs > 0) { // should do partition discovery LOG.info( "Starting the FlussSourceEnumerator for table {} " + "with new partition discovery interval of {} ms.", tablePath, scanPartitionDiscoveryIntervalMs); // discover new partitions and handle new partitions at fixed delay. workerExecutor.callAsyncAtFixedDelay( this::listPartitions, // In this step, hybrid splits with partition alread expired in fluss will be removed from pendingSplitAssignment this::checkPartitionChanges, 0, scanPartitionDiscoveryIntervalMs); } else { // just call once LOG.info( "Starting the FlussSourceEnumerator for table {} without partition discovery.", tablePath); workerExecutor.callAsync(this::listPartitions, this::checkPartitionChanges); } } else { startInBatchMode(); } } else { if (streaming) { startInStreamModeForNonPartitionedTable(); } else { startInBatchMode(); } } } ``` ### Solution _No response_ ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
