xiearthur commented on issue #12661:
URL: https://github.com/apache/hudi/issues/12661#issuecomment-2599472675

   I can confirm this issue and have traced the source code to understand why.
   
   **Root Cause Analysis:**
   In `IncrementalInputSplits.java`, the core logic for streaming reads is:
   
   ```java
   public List<HoodieInstant> filterInstantsWithRange(
         HoodieTimeline commitTimeline,
         @Nullable final String issuedInstant) {
       // For continuous streaming read
       if (issuedInstant != null) {
           return completedTimeline
               .getInstantsAsStream()
               .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN, issuedInstant))
               .collect(Collectors.toList());
       }
   
       // For initial read
       Stream<HoodieInstant> instantStream = 
completedTimeline.getInstantsAsStream();
       if (OptionsResolver.hasNoSpecificReadCommits(this.conf)) {
           // snapshot read - only reads the latest commit
           return 
completedTimeline.lastInstant().map(Collections::singletonList).orElseGet(Collections::emptyList);
       }
   
       // With specific start commit time
       if (OptionsResolver.isSpecificStartCommit(this.conf)) {
           final String startCommit = 
this.conf.get(FlinkOptions.READ_START_COMMIT);
           instantStream = instantStream
               .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), 
GREATER_THAN_OR_EQUALS, startCommit));
       }
   }
   ```
   
   Currently, this only works correctly with `earliest` setting because:
   1. When using snapshot mode, it only reads the latest commit without 
monitoring new data
   2. When using specific timestamp, it only reads data before the Flink job 
starts
   3. Only with `earliest`, it can:
      - Start reading from the earliest data
      - Correctly update issuedInstant
      - Continue monitoring new data
   
   **Question:**
   We have a large amount of historical data, and reading from the earliest 
commit every time we start the Flink job consumes significant resources. Is 
there any way to:
   1. Start reading from a specific timestamp
   2. Still be able to monitor and read new data changes
   3. Avoid processing all historical data
   
   Looking forward to your suggestions on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to