hemanthsavasere opened a new issue, #2807:
URL: https://github.com/apache/fluss/issues/2807

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Fluss version
   
   0.9.0 (latest release)
   
   ### Please describe the bug 🐞
   
   Error:  Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 
19.961 s <<< FAILURE! - in 
org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScannerITCase
   Error:  
org.apache.fluss.client.table.scanner.batch.KvSnapshotBatchScannerITCase.testKvSnapshotLease
  Time elapsed: 3.16 s  <<< FAILURE!
   org.opentest4j.AssertionFailedError: 
   
   expected: [1L, 1L, 1L]
    but was: [1L, 0L, 1L]
   
   The tests that were run was part of 
   https://github.com/apache/fluss/actions/runs/22756751008/job/66003123256
   
   ### Solution
   
    Root Cause Analysis                                                         
                                                                                
                                            
                                                                                
                                                                                
                                            
    Primary Race: Async triggerSnapshot in FlussClusterExtension                
                                                                                
                                            
                                                                                
                                                                                
                                            
    In FlussClusterExtension.triggerSnapshot() (line 747-775):                  
                                                                                
                                            
                                                                                
                                                                                
                                            
    snapshotId = kvSnapshotManager.currentSnapshotId();     // read counter 
(e.g., 1)                                                                       
                                                
    kvSnapshotManager.triggerSnapshot();                     // ASYNC - queues 
to guardedExecutor                                                              
                                             
    nextSnapshotId = kvSnapshotManager.currentSnapshotId(); // read counter 
again                                                                           
                                                
                                                                                
                                                                                
                                            
    PeriodicSnapshotManager.triggerSnapshot() (line 186) submits work to 
guardedExecutor.execute(...) which is asynchronous. The actual initSnapshot() 
call — which increments snapshotIdCounter via        
    getAndIncrement() at KvTabletSnapshotTarget.java:183 — runs on the 
guardedExecutor thread, NOT the calling thread.                                 
                                                     
                                                                                
                                                                                
                                            
    The race:                                                                   
                                                                                
                                            
    1. Test thread reads snapshotId = 1 (counter value after first snapshot)    
                                                                                
                                            
    2. Test thread calls triggerSnapshot() → queues async task                  
                                                                                
                                            
    3. Test thread reads nextSnapshotId = 1 (counter hasn't been incremented 
yet)                                                                            
                                               
    4. nextSnapshotId == snapshotId → method returns null                       
                                                                                
                                            
    5. triggerAndWaitSnapshots skips this bucket entirely                       
                                                                                
                                            
                                                                                
                                                                                
                                            
    The async task eventually runs and creates snapshot 1, but the test has 
already moved on. When getLatestKvSnapshots() is called, this bucket may still 
show snapshot 0.                                 
                                                                                
                                                                                
                                            
    Secondary Race: Log offset not yet flushed to KV                            
                                                                                
                                            
                                                                                
                                                                                
                                            
    In KvTabletSnapshotTarget.initSnapshot() (line 170-203), there's a check:   
                                                                                
                                            
    if (logOffset <= logOffsetOfLatestSnapshot) {                               
                                                                                
                                            
        return Optional.empty();  // skip — no new data                         
                                                                                
                                            
    }                                                                           
                                                                                
                                            
                                                                                
                                                                                
                                            
    Even if the async trigger runs, if the KV tablet hasn't applied the new log 
entries yet (from putRows), the snapshot is skipped because the flushed log 
offset hasn't advanced.                         
                                                                                
                                                                                
                                            
    Flow of the failure                                                         
                                                                                
                                            
                                                                                
                                                                                
                                            
    1. triggerAndWaitSnapshots → for bucket 1, triggerSnapshot returns null 
(race) → bucket 1 is skipped                                                    
                                                
    2. Test calls admin.getLatestKvSnapshots() → bucket 1 still at snapshot 0   
                                                                                
                                            
    3. acquireSnapshots stores {bucket0: 1, bucket1: 0, bucket2: 1}             
                                                                                
                                            
    4. checkKvSnapshotLeaseEquals expects [1L, 1L, 1L] but gets [1L, 0L, 1L]    
                                                                                
                                            
                                                                                
                                                                                
                                            
    Fix                                                                         
                                                                                
                                            
                                                                                
                                                                                
                                            
    Modify FlussClusterExtension.triggerSnapshot() to properly handle the async 
nature. Instead of checking currentSnapshotId() immediately after the async 
trigger, use a retry/poll approach to wait for  
    the snapshot ID to increment.                                               
                                                                                
                                            
                                                                                
                                                                                
                                            
    File to modify                                                              
                                                                                
                                                                                
                                                                                
                                                                                
     
   - 
fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 (lines 747-775)                                                                
                               
                                                                                
                                                                                
                                            
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to