wwj6591812 opened a new pull request, #7991:
URL: https://github.com/apache/paimon/pull/7991

   ### Problem
   
   With `scan.dedicated-split-generation=true`, `LIMIT N` returns correct 
results but scans the entire ORC file. Flink UI shows the Limit stage ~19s for 
10 records.
   
   ```sql
   /*+ config(job_name=hl_0527, cluster=hongli-duibi-0528) */ 
   select
     *
   from
     
`PAIMON-MAINSE-DUMP-EA-20250514`.`PAIMON-MAINSE-DUMP-EA-20250514_MAINSE_DUMP`.`APP10000004119_MAINSE_PAIMON_D20260526194436`
     /*+ `OPTIONS`('scan.dedicated-split-generation'='true', 'parallelism' = 
'1') */
   limit
     10;
   ```
   
   ```Flink UI
   <img width="1918" height="1028" alt="image" 
src="https://github.com/user-attachments/assets/c0cfa4d6-2882-4976-a162-a406cafa6fd0";
 />
   ```
   
   ### Root Cause
   
   **Before**, `ReadOperator` checked the limit *after* `hasNext()`. One extra 
`hasNext()` after N rows triggered `RecordReaderIterator.advanceIfNeeded()`, 
which loops `readBatch()` until EOF:
   
   ```java
   while (iterator.hasNext()) {   // extra hasNext() after N rows
       if (reachLimit()) return;
       output.collect(iterator.next());
   }
   ```
   
   With `withLimit(N)`, `ApplyBitmapIndexRecordReader` stops yielding rows 
after position N−1, but the iterator still reads every remaining ORC batch.
   
   The FLIP-27 path already avoids this — `FileStoreSourceSplitReader` checks 
`RecordLimiter` **before** `readBatch()`:
   
   ```java
   nextBatch = reachLimit() ? null : currentReader.recordReader().readBatch();
   ```
   
   ### Evidence
   
   **Flink UI:** Limit operator ~19s, 10 records.
   
   **Arthas:**
   
   ```bash
   watch org.apache.paimon.reader.RecordReaderIterator advanceIfNeeded 
'{#cost}' '#cost>500' -n 1
   # → cost=12501ms (single call after 10th row)
   ```
   
   ### Fix
   
   Use `RecordLimiter` and check limit **before** `hasNext()` via short-circuit 
evaluation:
   
   ```java
   while (!reachLimit() && iterator.hasNext()) {
       output.collect(...);
       recordLimiter.increment();
   }
   
   private boolean reachLimit() {
       return recordLimiter != null && recordLimiter.reachLimit();
   }
   ```
   
   When `recordLimiter.reachLimit()` is true, `hasNext()` is never called — no 
full-file scan after limit.
   
   ### Testing
   
   - `DedicatedSplitReadLimitTest` — 100-row split, `LIMIT 10` → 10 rows, 
`readBatch` called once
   - `OperatorSourceTest.testReadOperatorWithLimit` — limit=2 → exactly 2 rows
   - `BatchFileStoreITCase.testDedicatedPathLimitTenOnManyRows` — 100 rows 
INSERT, `LIMIT 10` → 10 rows
   
   ```bash
   mvn test -pl paimon-flink/paimon-flink-common \
     
-Dtest=DedicatedSplitReadLimitTest,OperatorSourceTest#testReadOperatorWithLimit,BatchFileStoreITCase#testDedicatedPathLimitTenOnManyRows
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to