LiebingYu opened a new issue, #2169:
URL: https://github.com/apache/fluss/issues/2169

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Fluss version
   
   main (development)
   
   ### Please describe the bug 🐞
   
   The original intent of Fluss union read is to jointly read data from both 
the lake and Fluss, leveraging historical data in the lake to complement 
Fluss’s real-time data. However, in the current Flink source implementation, if 
a partition exists only in the lake but has already expired in Fluss, the union 
read is very likely to miss the data from that partition.
   
   After some debugging, I found that this issue only occurs when partition 
discovery is enabled. We can see some clues from `strat()`.
   ```java
       @Override
       public void start() {
           ..
   
           if (isPartitioned) {
               if (streaming) {
                   if (lakeSource != null) {
                       // we'll need to consider lake splits
                       List<SourceSplitBase> hybridLakeFlussSplits = 
generateHybridLakeFlussSplits();
                       if (hybridLakeFlussSplits != null) {
                           LOG.info("Generated hybrid lake splits: {}", 
hybridLakeFlussSplits);
                           // In this step, hybrid lake fluss splits will be 
append to pendingSplitAssignment.
                           // But note that there are no registered readers 
now, so the hybrid splits won't be actually assigned to readers.
                           handleSplitsAdd(hybridLakeFlussSplits, null);
                       }
                   }
   
                   if (scanPartitionDiscoveryIntervalMs > 0) {
                       // should do partition discovery
                       LOG.info(
                               "Starting the FlussSourceEnumerator for table {} 
"
                                       + "with new partition discovery interval 
of {} ms.",
                               tablePath,
                               scanPartitionDiscoveryIntervalMs);
                       // discover new partitions and handle new partitions at 
fixed delay.
                       workerExecutor.callAsyncAtFixedDelay(
                               this::listPartitions,
                               // In this step, hybrid splits with partition 
alread expired in fluss will be removed from pendingSplitAssignment
                               this::checkPartitionChanges,
                               0,
                               scanPartitionDiscoveryIntervalMs);
                   } else {
                       // just call once
                       LOG.info(
                               "Starting the FlussSourceEnumerator for table {} 
without partition discovery.",
                               tablePath);
                       workerExecutor.callAsync(this::listPartitions, 
this::checkPartitionChanges);
                   }
               } else {
                   startInBatchMode();
               }
           } else {
               if (streaming) {
                   startInStreamModeForNonPartitionedTable();
               } else {
                   startInBatchMode();
               }
           }
       }
   ```
   
   ### Solution
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to