Copilot commented on code in PR #351:
URL: https://github.com/apache/fluss-rust/pull/351#discussion_r2820755275


##########
website/docs/user-guide/rust/example/log-tables.md:
##########
@@ -63,6 +63,21 @@ log_scanner.subscribe(0, 0).await?;
 // Poll for records
 let records = log_scanner.poll(Duration::from_secs(10)).await?;
 
+// Per-bucket access
+for (bucket, bucket_records) in records.records_by_buckets() {
+    println!("Bucket {}: {} records", bucket.bucket, bucket_records.len());

Review Comment:
   The Rust example code uses `bucket.bucket` to access the bucket ID, but the 
TableBucket struct has a private field named `bucket` and a public accessor 
method `bucket_id()`. The documentation should use `bucket.bucket_id()` instead 
of `bucket.bucket` to match the public API.
   ```suggestion
       println!("Bucket {}: {} records", bucket.bucket_id(), 
bucket_records.len());
   ```



##########
bindings/python/src/table.rs:
##########
@@ -155,6 +153,247 @@ impl RecordBatch {
     }
 }
 
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+    records_by_bucket: HashMap<TableBucket, Vec<Py<ScanRecord>>>,
+    total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+    /// List of distinct buckets that have records in this result.
+    pub fn buckets(&self) -> Vec<TableBucket> {
+        self.records_by_bucket.keys().cloned().collect()
+    }
+
+    /// Get records for a specific bucket.
+    ///
+    /// Returns an empty list if the bucket is not present (matches Rust/Java 
behavior).
+    pub fn records(&self, py: Python, bucket: &TableBucket) -> 
Vec<Py<ScanRecord>> {
+        self.records_by_bucket
+            .get(bucket)
+            .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+            .unwrap_or_default()
+    }
+
+    /// Total number of records across all buckets.
+    pub fn count(&self) -> usize {
+        self.total_count
+    }
+
+    /// Whether the result set is empty.
+    pub fn is_empty(&self) -> bool {
+        self.total_count == 0
+    }
+
+    fn __len__(&self) -> usize {
+        self.total_count
+    }
+
+    /// Type-dispatched indexing:
+    ///   records[0]       → ScanRecord (flat index)
+    ///   records[-1]      → ScanRecord (negative index)
+    ///   records[1:3]     → list[ScanRecord] (slice)
+    ///   records[bucket]  → list[ScanRecord] (by bucket)
+    fn __getitem__(&self, py: Python, key: &Bound<'_, pyo3::PyAny>) -> 
PyResult<Py<pyo3::PyAny>> {
+        // Try integer index first
+        if let Ok(mut idx) = key.extract::<isize>() {
+            let len = self.total_count as isize;
+            if idx < 0 {
+                idx += len;
+            }
+            if idx < 0 || idx >= len {
+                return Err(pyo3::exceptions::PyIndexError::new_err(format!(
+                    "index {idx} out of range for ScanRecords of size {len}"
+                )));
+            }
+            let idx = idx as usize;
+            let mut offset = 0;
+            for recs in self.records_by_bucket.values() {
+                if idx < offset + recs.len() {
+                    return Ok(recs[idx - offset].clone_ref(py).into_any());
+                }
+                offset += recs.len();
+            }
+            return Err(pyo3::exceptions::PyRuntimeError::new_err(
+                "internal error: total_count out of sync with records",
+            ));
+        }
+        // Try slice
+        if let Ok(slice) = key.downcast::<pyo3::types::PySlice>() {
+            let indices = slice.indices(self.total_count as isize)?;
+            let mut result: Vec<Py<ScanRecord>> = Vec::new();
+            let mut i = indices.start;
+            while (indices.step > 0 && i < indices.stop) || (indices.step < 0 
&& i > indices.stop) {
+                let idx = i as usize;
+                let mut offset = 0;
+                for recs in self.records_by_bucket.values() {
+                    if idx < offset + recs.len() {
+                        result.push(recs[idx - offset].clone_ref(py));
+                        break;
+                    }
+                    offset += recs.len();
+                }
+                i += indices.step;
+            }

Review Comment:
   The slice implementation doesn't correctly handle negative steps. When 
`indices.step < 0`, the code should iterate backwards (from high indices to low 
indices), but the inner loop (lines 232-238) always iterates forward through 
buckets. This means slices like `scan_records[::-1]` or `scan_records[10:0:-1]` 
will not return records in the correct order. The implementation needs to 
either convert each negative index to the corresponding record lookup, or 
maintain a consistent bucket ordering that allows proper reverse iteration.



##########
bindings/python/src/table.rs:
##########
@@ -155,6 +153,247 @@ impl RecordBatch {
     }
 }
 
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+    records_by_bucket: HashMap<TableBucket, Vec<Py<ScanRecord>>>,
+    total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+    /// List of distinct buckets that have records in this result.
+    pub fn buckets(&self) -> Vec<TableBucket> {
+        self.records_by_bucket.keys().cloned().collect()
+    }
+
+    /// Get records for a specific bucket.
+    ///
+    /// Returns an empty list if the bucket is not present (matches Rust/Java 
behavior).
+    pub fn records(&self, py: Python, bucket: &TableBucket) -> 
Vec<Py<ScanRecord>> {
+        self.records_by_bucket
+            .get(bucket)
+            .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+            .unwrap_or_default()
+    }
+
+    /// Total number of records across all buckets.
+    pub fn count(&self) -> usize {
+        self.total_count
+    }
+
+    /// Whether the result set is empty.
+    pub fn is_empty(&self) -> bool {
+        self.total_count == 0
+    }
+
+    fn __len__(&self) -> usize {
+        self.total_count
+    }
+
+    /// Type-dispatched indexing:
+    ///   records[0]       → ScanRecord (flat index)
+    ///   records[-1]      → ScanRecord (negative index)
+    ///   records[1:3]     → list[ScanRecord] (slice)
+    ///   records[bucket]  → list[ScanRecord] (by bucket)
+    fn __getitem__(&self, py: Python, key: &Bound<'_, pyo3::PyAny>) -> 
PyResult<Py<pyo3::PyAny>> {
+        // Try integer index first
+        if let Ok(mut idx) = key.extract::<isize>() {
+            let len = self.total_count as isize;
+            if idx < 0 {
+                idx += len;
+            }
+            if idx < 0 || idx >= len {
+                return Err(pyo3::exceptions::PyIndexError::new_err(format!(
+                    "index {idx} out of range for ScanRecords of size {len}"
+                )));
+            }
+            let idx = idx as usize;
+            let mut offset = 0;
+            for recs in self.records_by_bucket.values() {
+                if idx < offset + recs.len() {
+                    return Ok(recs[idx - offset].clone_ref(py).into_any());
+                }
+                offset += recs.len();
+            }

Review Comment:
   The flat indexing (`__getitem__` with int/slice) and iteration (`__iter__`) 
use different mechanisms to traverse buckets. Indexing directly iterates 
`self.records_by_bucket.values()` on each call (lines 215, 233), while 
iteration captures keys once in a Vec (line 305). This means the bucket 
traversal order can differ between indexing and iteration operations on the 
same ScanRecords instance. For example, `scan_records[0]` might return a 
different record than `next(iter(scan_records))` due to HashMap iteration 
non-determinism. Consider capturing bucket keys in a consistent order (e.g., 
sorted or insertion-ordered) to ensure that all access patterns on the same 
ScanRecords instance use the same bucket ordering.



##########
website/docs/user-guide/python/api-reference.md:
##########
@@ -137,17 +137,69 @@ Builder for creating a `Lookuper`. Obtain via 
`FlussTable.new_lookup()`.
 | `.subscribe_partition_buckets(partition_bucket_offsets)`      | Subscribe to 
multiple partition+bucket combos (`{(part_id, bucket_id): offset}`) |
 | `.unsubscribe(bucket_id)`                                     | Unsubscribe 
from a bucket (non-partitioned tables)                               |
 | `.unsubscribe_partition(partition_id, bucket_id)`             | Unsubscribe 
from a partition bucket                                              |
-| `.poll(timeout_ms) -> list[ScanRecord]`                       | Poll 
individual records (record scanner only)                                    |
+| `.poll(timeout_ms) -> ScanRecords`                            | Poll 
individual records (record scanner only)                                    |
 | `.poll_arrow(timeout_ms) -> pa.Table`                         | Poll as 
Arrow Table (batch scanner only)                                         |
 | `.poll_record_batch(timeout_ms) -> list[RecordBatch]`         | Poll batches 
with metadata (batch scanner only)                                  |
 | `.to_arrow() -> pa.Table`                                     | Read all 
subscribed data as Arrow Table (batch scanner only)                     |
 | `.to_pandas() -> pd.DataFrame`                                | Read all 
subscribed data as DataFrame (batch scanner only)                       |
 
+## `ScanRecords`
+
+Returned by `LogScanner.poll()`. Records are grouped by bucket.
+
+> **Note:** Flat iteration and integer indexing traverse buckets in an 
arbitrary order that is consistent within a single `ScanRecords` instance but 
may differ between `poll()` calls. Use per-bucket access (`.items()`, 
`.records(bucket)`) when bucket ordering matters.

Review Comment:
   The note states that flat iteration and integer indexing "traverse buckets 
in an arbitrary order that is consistent within a single ScanRecords instance". 
However, this is not accurate. Due to HashMap iteration non-determinism in the 
implementation, the bucket traversal order can actually differ between 
different operations on the same ScanRecords instance. For example, 
`scan_records[0]` (which calls `__getitem__`) and `next(iter(scan_records))` 
(which calls `__iter__`) may return different records because they iterate over 
the HashMap differently. The note should clarify that the order is consistent 
within a single iteration/access operation but may vary between different 
operations on the same instance.
   ```suggestion
   > **Note:** Flat iteration and integer indexing traverse buckets in an 
arbitrary, implementation-dependent order. The order is only consistent within 
a single iteration or indexing operation and may differ between different 
operations on the same `ScanRecords` instance (for example, `scan_records[0]` 
vs `next(iter(scan_records))`) and between `poll()` calls. Use per-bucket 
access (`.items()`, `.records(bucket)`) when bucket ordering matters.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to