JingsongLi commented on code in PR #230:
URL: https://github.com/apache/paimon-rust/pull/230#discussion_r3055643843


##########
crates/paimon/src/arrow/format/parquet.rs:
##########
@@ -809,14 +831,73 @@ impl AsyncFileReader for ArrowFileReader {
         self.read_bytes(range)
     }
 
+    fn get_byte_ranges(
+        &mut self,
+        ranges: Vec<Range<u64>>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
+        let coalesce_bytes = self.range_coalesce_bytes;
+        let concurrency = self.range_fetch_concurrency.max(1);
+
+        async move {
+            if ranges.is_empty() {
+                return Ok(vec![]);
+            }
+
+            // Two-phase range optimization:
+            // Phase 1: Merge nearby ranges based on coalesce threshold.
+            let coalesced = merge_byte_ranges(&ranges, coalesce_bytes);
+            // Phase 2: Split large merged ranges to utilize concurrency,
+            // but only at original range boundaries.
+            let fetch_ranges = split_ranges_for_concurrency(coalesced, 
&ranges, concurrency);
+
+            // Fetch merged ranges concurrently.
+            let r = &self.r;
+            let fetched: Vec<Bytes> = if fetch_ranges.len() <= concurrency {
+                // All ranges fit within the concurrency limit — fire them all 
at once.
+                futures::future::try_join_all(fetch_ranges.iter().map(|range| {
+                    r.read(range.clone())
+                        .map_err(|e| 
parquet::errors::ParquetError::External(format!("{e}").into()))
+                }))
+                .await?
+            } else {
+                // More ranges than concurrency slots — use buffered stream.
+                futures::stream::iter(fetch_ranges.iter().cloned())
+                    .map(|range| async move {
+                        r.read(range).await.map_err(|e| {
+                            
parquet::errors::ParquetError::External(format!("{e}").into())
+                        })
+                    })
+                    .buffered(concurrency)
+                    .try_collect()
+                    .await?
+            };
+
+            // Slice the fetched data back into the originally requested 
ranges.
+            Ok(ranges
+                .iter()
+                .map(|range| {
+                    let idx = fetch_ranges.partition_point(|v| v.start <= 
range.start) - 1;
+                    let fetch_range = &fetch_ranges[idx];
+                    let fetch_bytes = &fetched[idx];
+                    let start = (range.start - fetch_range.start) as usize;
+                    let end = (range.end - fetch_range.start) as usize;
+                    fetch_bytes.slice(start..end.min(fetch_bytes.len()))

Review Comment:
   If the data returned by fetch is not long enough, truncated data will be 
silently returned here, and downstream may obtain incomplete column chunks, 
leading to parsing errors or data corruption. Suggest changing it to assert or 
returning an error, do not silently swallow it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to