leekeiabstraction commented on code in PR #351:
URL: https://github.com/apache/fluss-rust/pull/351#discussion_r2822268284
##########
bindings/python/src/table.rs:
##########
@@ -155,6 +154,247 @@ impl RecordBatch {
}
}
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+ records_by_bucket: IndexMap<TableBucket, Vec<Py<ScanRecord>>>,
+ total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+ /// List of distinct buckets that have records in this result.
+ pub fn buckets(&self) -> Vec<TableBucket> {
+ self.records_by_bucket.keys().cloned().collect()
+ }
+
+ /// Get records for a specific bucket.
+ ///
+ /// Returns an empty list if the bucket is not present (matches Rust/Java
behavior).
+ pub fn records(&self, py: Python, bucket: &TableBucket) ->
Vec<Py<ScanRecord>> {
+ self.records_by_bucket
+ .get(bucket)
+ .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+ .unwrap_or_default()
+ }
+
+ /// Total number of records across all buckets.
+ pub fn count(&self) -> usize {
+ self.total_count
+ }
+
+ /// Whether the result set is empty.
+ pub fn is_empty(&self) -> bool {
+ self.total_count == 0
+ }
+
+ fn __len__(&self) -> usize {
+ self.total_count
+ }
+
+ /// Type-dispatched indexing:
+ /// records[0] → ScanRecord (flat index)
+ /// records[-1] → ScanRecord (negative index)
+ /// records[1:3] → list[ScanRecord] (slice)
+ /// records[bucket] → list[ScanRecord] (by bucket)
+ fn __getitem__(&self, py: Python, key: &Bound<'_, pyo3::PyAny>) ->
PyResult<Py<pyo3::PyAny>> {
Review Comment:
nit: pyo3:: not needed
##########
bindings/python/src/table.rs:
##########
@@ -155,6 +154,247 @@ impl RecordBatch {
}
}
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+ records_by_bucket: IndexMap<TableBucket, Vec<Py<ScanRecord>>>,
+ total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+ /// List of distinct buckets that have records in this result.
+ pub fn buckets(&self) -> Vec<TableBucket> {
+ self.records_by_bucket.keys().cloned().collect()
+ }
+
+ /// Get records for a specific bucket.
+ ///
+ /// Returns an empty list if the bucket is not present (matches Rust/Java
behavior).
+ pub fn records(&self, py: Python, bucket: &TableBucket) ->
Vec<Py<ScanRecord>> {
+ self.records_by_bucket
+ .get(bucket)
+ .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+ .unwrap_or_default()
+ }
+
+ /// Total number of records across all buckets.
+ pub fn count(&self) -> usize {
+ self.total_count
+ }
+
+ /// Whether the result set is empty.
+ pub fn is_empty(&self) -> bool {
+ self.total_count == 0
+ }
+
+ fn __len__(&self) -> usize {
+ self.total_count
+ }
+
+ /// Type-dispatched indexing:
+ /// records[0] → ScanRecord (flat index)
+ /// records[-1] → ScanRecord (negative index)
+ /// records[1:3] → list[ScanRecord] (slice)
+ /// records[bucket] → list[ScanRecord] (by bucket)
+ fn __getitem__(&self, py: Python, key: &Bound<'_, pyo3::PyAny>) ->
PyResult<Py<pyo3::PyAny>> {
+ // Try integer index first
+ if let Ok(mut idx) = key.extract::<isize>() {
+ let len = self.total_count as isize;
+ if idx < 0 {
+ idx += len;
+ }
+ if idx < 0 || idx >= len {
+ return Err(pyo3::exceptions::PyIndexError::new_err(format!(
+ "index {idx} out of range for ScanRecords of size {len}"
+ )));
+ }
+ let idx = idx as usize;
+ let mut offset = 0;
+ for recs in self.records_by_bucket.values() {
+ if idx < offset + recs.len() {
+ return Ok(recs[idx - offset].clone_ref(py).into_any());
+ }
+ offset += recs.len();
+ }
+ return Err(pyo3::exceptions::PyRuntimeError::new_err(
+ "internal error: total_count out of sync with records",
+ ));
+ }
+ // Try slice
+ if let Ok(slice) = key.downcast::<pyo3::types::PySlice>() {
+ let indices = slice.indices(self.total_count as isize)?;
+ let mut result: Vec<Py<ScanRecord>> = Vec::new();
+ let mut i = indices.start;
+ while (indices.step > 0 && i < indices.stop) || (indices.step < 0
&& i > indices.stop) {
+ let idx = i as usize;
+ let mut offset = 0;
+ for recs in self.records_by_bucket.values() {
+ if idx < offset + recs.len() {
+ result.push(recs[idx - offset].clone_ref(py));
+ break;
+ }
+ offset += recs.len();
+ }
+ i += indices.step;
+ }
+ return Ok(result.into_pyobject(py).unwrap().into_any().unbind());
+ }
+ // Try TableBucket
+ if let Ok(bucket) = key.extract::<TableBucket>() {
+ let recs = self.records(py, &bucket);
+ return Ok(recs.into_pyobject(py).unwrap().into_any().unbind());
+ }
+ Err(pyo3::exceptions::PyTypeError::new_err(
+ "index must be int, slice, or TableBucket",
+ ))
+ }
+
+ /// Support `bucket in records`.
+ fn __contains__(&self, bucket: &TableBucket) -> bool {
+ self.records_by_bucket.contains_key(bucket)
+ }
+
+ /// Mapping protocol: alias for `buckets()`.
+ pub fn keys(&self) -> Vec<TableBucket> {
+ self.buckets()
+ }
+
+ /// Mapping protocol: lazy iterator over record lists, one per bucket.
+ pub fn values(slf: Bound<'_, Self>) -> ScanRecordsBucketIter {
+ let this = slf.borrow();
+ let bucket_keys: Vec<TableBucket> =
this.records_by_bucket.keys().cloned().collect();
+ drop(this);
+ ScanRecordsBucketIter {
+ owner: slf.unbind(),
+ bucket_keys,
+ bucket_idx: 0,
+ with_keys: false,
+ }
+ }
+
+ /// Mapping protocol: lazy iterator over `(TableBucket, list[ScanRecord])`
pairs.
+ pub fn items(slf: Bound<'_, Self>) -> ScanRecordsBucketIter {
+ let this = slf.borrow();
+ let bucket_keys: Vec<TableBucket> =
this.records_by_bucket.keys().cloned().collect();
+ drop(this);
+ ScanRecordsBucketIter {
+ owner: slf.unbind(),
+ bucket_keys,
+ bucket_idx: 0,
+ with_keys: true,
+ }
+ }
+
+ fn __str__(&self) -> String {
+ format!(
+ "ScanRecords(records={}, buckets={})",
+ self.total_count,
+ self.records_by_bucket.len()
+ )
+ }
+
+ fn __repr__(&self) -> String {
+ self.__str__()
+ }
+
+ /// Flat iterator over all records across all buckets (matches Java/Rust).
+ fn __iter__(slf: Bound<'_, Self>) -> ScanRecordsIter {
+ let this = slf.borrow();
+ let bucket_keys: Vec<TableBucket> =
this.records_by_bucket.keys().cloned().collect();
+ drop(this);
+ ScanRecordsIter {
+ owner: slf.unbind(),
+ bucket_keys,
+ bucket_idx: 0,
+ rec_idx: 0,
+ }
+ }
+}
+
+#[pyclass]
+struct ScanRecordsIter {
+ owner: Py<ScanRecords>,
Review Comment:
nit: borrower?
##########
bindings/python/src/table.rs:
##########
@@ -155,6 +154,247 @@ impl RecordBatch {
}
}
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+ records_by_bucket: IndexMap<TableBucket, Vec<Py<ScanRecord>>>,
+ total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+ /// List of distinct buckets that have records in this result.
+ pub fn buckets(&self) -> Vec<TableBucket> {
+ self.records_by_bucket.keys().cloned().collect()
+ }
+
+ /// Get records for a specific bucket.
+ ///
+ /// Returns an empty list if the bucket is not present (matches Rust/Java
behavior).
+ pub fn records(&self, py: Python, bucket: &TableBucket) ->
Vec<Py<ScanRecord>> {
+ self.records_by_bucket
+ .get(bucket)
+ .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+ .unwrap_or_default()
+ }
+
+ /// Total number of records across all buckets.
+ pub fn count(&self) -> usize {
+ self.total_count
+ }
+
+ /// Whether the result set is empty.
+ pub fn is_empty(&self) -> bool {
+ self.total_count == 0
+ }
+
+ fn __len__(&self) -> usize {
+ self.total_count
+ }
+
+ /// Type-dispatched indexing:
+ /// records[0] → ScanRecord (flat index)
+ /// records[-1] → ScanRecord (negative index)
+ /// records[1:3] → list[ScanRecord] (slice)
+ /// records[bucket] → list[ScanRecord] (by bucket)
+ fn __getitem__(&self, py: Python, key: &Bound<'_, pyo3::PyAny>) ->
PyResult<Py<pyo3::PyAny>> {
+ // Try integer index first
+ if let Ok(mut idx) = key.extract::<isize>() {
+ let len = self.total_count as isize;
+ if idx < 0 {
+ idx += len;
+ }
+ if idx < 0 || idx >= len {
+ return Err(pyo3::exceptions::PyIndexError::new_err(format!(
Review Comment:
nit: use import instead of full namespace
##########
bindings/python/src/table.rs:
##########
@@ -155,6 +154,247 @@ impl RecordBatch {
}
}
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+ records_by_bucket: IndexMap<TableBucket, Vec<Py<ScanRecord>>>,
+ total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+ /// List of distinct buckets that have records in this result.
+ pub fn buckets(&self) -> Vec<TableBucket> {
+ self.records_by_bucket.keys().cloned().collect()
+ }
+
+ /// Get records for a specific bucket.
+ ///
+ /// Returns an empty list if the bucket is not present (matches Rust/Java
behavior).
+ pub fn records(&self, py: Python, bucket: &TableBucket) ->
Vec<Py<ScanRecord>> {
+ self.records_by_bucket
+ .get(bucket)
+ .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+ .unwrap_or_default()
+ }
+
+ /// Total number of records across all buckets.
+ pub fn count(&self) -> usize {
+ self.total_count
+ }
+
+ /// Whether the result set is empty.
+ pub fn is_empty(&self) -> bool {
+ self.total_count == 0
+ }
+
+ fn __len__(&self) -> usize {
+ self.total_count
+ }
+
+ /// Type-dispatched indexing:
+ /// records[0] → ScanRecord (flat index)
+ /// records[-1] → ScanRecord (negative index)
+ /// records[1:3] → list[ScanRecord] (slice)
+ /// records[bucket] → list[ScanRecord] (by bucket)
+ fn __getitem__(&self, py: Python, key: &Bound<'_, pyo3::PyAny>) ->
PyResult<Py<pyo3::PyAny>> {
+ // Try integer index first
+ if let Ok(mut idx) = key.extract::<isize>() {
+ let len = self.total_count as isize;
+ if idx < 0 {
+ idx += len;
+ }
+ if idx < 0 || idx >= len {
+ return Err(pyo3::exceptions::PyIndexError::new_err(format!(
+ "index {idx} out of range for ScanRecords of size {len}"
+ )));
+ }
+ let idx = idx as usize;
+ let mut offset = 0;
+ for recs in self.records_by_bucket.values() {
+ if idx < offset + recs.len() {
+ return Ok(recs[idx - offset].clone_ref(py).into_any());
+ }
+ offset += recs.len();
+ }
+ return Err(pyo3::exceptions::PyRuntimeError::new_err(
Review Comment:
nit: use import instead of full namespace
##########
bindings/python/src/table.rs:
##########
@@ -155,6 +154,247 @@ impl RecordBatch {
}
}
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+ records_by_bucket: IndexMap<TableBucket, Vec<Py<ScanRecord>>>,
+ total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+ /// List of distinct buckets that have records in this result.
+ pub fn buckets(&self) -> Vec<TableBucket> {
+ self.records_by_bucket.keys().cloned().collect()
+ }
+
+ /// Get records for a specific bucket.
+ ///
+ /// Returns an empty list if the bucket is not present (matches Rust/Java
behavior).
+ pub fn records(&self, py: Python, bucket: &TableBucket) ->
Vec<Py<ScanRecord>> {
+ self.records_by_bucket
+ .get(bucket)
+ .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+ .unwrap_or_default()
+ }
+
+ /// Total number of records across all buckets.
+ pub fn count(&self) -> usize {
+ self.total_count
+ }
+
+ /// Whether the result set is empty.
+ pub fn is_empty(&self) -> bool {
+ self.total_count == 0
+ }
+
+ fn __len__(&self) -> usize {
+ self.total_count
+ }
+
+ /// Type-dispatched indexing:
+ /// records[0] → ScanRecord (flat index)
+ /// records[-1] → ScanRecord (negative index)
+ /// records[1:3] → list[ScanRecord] (slice)
+ /// records[bucket] → list[ScanRecord] (by bucket)
+ fn __getitem__(&self, py: Python, key: &Bound<'_, pyo3::PyAny>) ->
PyResult<Py<pyo3::PyAny>> {
+ // Try integer index first
+ if let Ok(mut idx) = key.extract::<isize>() {
+ let len = self.total_count as isize;
+ if idx < 0 {
+ idx += len;
+ }
+ if idx < 0 || idx >= len {
+ return Err(pyo3::exceptions::PyIndexError::new_err(format!(
+ "index {idx} out of range for ScanRecords of size {len}"
+ )));
+ }
+ let idx = idx as usize;
+ let mut offset = 0;
+ for recs in self.records_by_bucket.values() {
+ if idx < offset + recs.len() {
+ return Ok(recs[idx - offset].clone_ref(py).into_any());
+ }
+ offset += recs.len();
+ }
+ return Err(pyo3::exceptions::PyRuntimeError::new_err(
+ "internal error: total_count out of sync with records",
+ ));
+ }
+ // Try slice
+ if let Ok(slice) = key.downcast::<pyo3::types::PySlice>() {
+ let indices = slice.indices(self.total_count as isize)?;
+ let mut result: Vec<Py<ScanRecord>> = Vec::new();
+ let mut i = indices.start;
+ while (indices.step > 0 && i < indices.stop) || (indices.step < 0
&& i > indices.stop) {
+ let idx = i as usize;
+ let mut offset = 0;
+ for recs in self.records_by_bucket.values() {
+ if idx < offset + recs.len() {
+ result.push(recs[idx - offset].clone_ref(py));
+ break;
+ }
+ offset += recs.len();
+ }
+ i += indices.step;
+ }
+ return Ok(result.into_pyobject(py).unwrap().into_any().unbind());
+ }
+ // Try TableBucket
+ if let Ok(bucket) = key.extract::<TableBucket>() {
+ let recs = self.records(py, &bucket);
+ return Ok(recs.into_pyobject(py).unwrap().into_any().unbind());
+ }
+ Err(pyo3::exceptions::PyTypeError::new_err(
+ "index must be int, slice, or TableBucket",
+ ))
+ }
+
+ /// Support `bucket in records`.
+ fn __contains__(&self, bucket: &TableBucket) -> bool {
+ self.records_by_bucket.contains_key(bucket)
+ }
+
+ /// Mapping protocol: alias for `buckets()`.
+ pub fn keys(&self) -> Vec<TableBucket> {
+ self.buckets()
+ }
+
+ /// Mapping protocol: lazy iterator over record lists, one per bucket.
+ pub fn values(slf: Bound<'_, Self>) -> ScanRecordsBucketIter {
+ let this = slf.borrow();
+ let bucket_keys: Vec<TableBucket> =
this.records_by_bucket.keys().cloned().collect();
+ drop(this);
+ ScanRecordsBucketIter {
+ owner: slf.unbind(),
+ bucket_keys,
+ bucket_idx: 0,
+ with_keys: false,
+ }
+ }
+
+ /// Mapping protocol: lazy iterator over `(TableBucket, list[ScanRecord])`
pairs.
+ pub fn items(slf: Bound<'_, Self>) -> ScanRecordsBucketIter {
+ let this = slf.borrow();
+ let bucket_keys: Vec<TableBucket> =
this.records_by_bucket.keys().cloned().collect();
+ drop(this);
+ ScanRecordsBucketIter {
+ owner: slf.unbind(),
+ bucket_keys,
+ bucket_idx: 0,
+ with_keys: true,
+ }
+ }
+
+ fn __str__(&self) -> String {
+ format!(
+ "ScanRecords(records={}, buckets={})",
+ self.total_count,
+ self.records_by_bucket.len()
+ )
+ }
+
+ fn __repr__(&self) -> String {
+ self.__str__()
+ }
+
+ /// Flat iterator over all records across all buckets (matches Java/Rust).
+ fn __iter__(slf: Bound<'_, Self>) -> ScanRecordsIter {
+ let this = slf.borrow();
+ let bucket_keys: Vec<TableBucket> =
this.records_by_bucket.keys().cloned().collect();
+ drop(this);
+ ScanRecordsIter {
+ owner: slf.unbind(),
+ bucket_keys,
+ bucket_idx: 0,
+ rec_idx: 0,
+ }
+ }
+}
+
+#[pyclass]
+struct ScanRecordsIter {
+ owner: Py<ScanRecords>,
+ bucket_keys: Vec<TableBucket>,
+ bucket_idx: usize,
+ rec_idx: usize,
+}
+
+#[pymethods]
+impl ScanRecordsIter {
+ fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
+ slf
+ }
+
+ fn __next__(&mut self, py: Python) -> Option<Py<ScanRecord>> {
+ let owner = self.owner.borrow(py);
+ loop {
+ if self.bucket_idx >= self.bucket_keys.len() {
+ return None;
+ }
+ let bucket = &self.bucket_keys[self.bucket_idx];
+ if let Some(recs) = owner.records_by_bucket.get(bucket) {
+ if self.rec_idx < recs.len() {
+ let rec = recs[self.rec_idx].clone_ref(py);
+ self.rec_idx += 1;
+ return Some(rec);
+ }
+ }
+ self.bucket_idx += 1;
+ self.rec_idx = 0;
+ }
+ }
+}
+
+/// Lazy iterator for `ScanRecords.items()` and `ScanRecords.values()`.
+///
+/// Yields one bucket at a time: `(TableBucket, list[ScanRecord])` for items,
+/// or `list[ScanRecord]` for values. Only materializes records for the
+/// current bucket on each `__next__` call.
+#[pyclass]
+pub struct ScanRecordsBucketIter {
+ owner: Py<ScanRecords>,
Review Comment:
nit: borrower?
##########
bindings/python/src/table.rs:
##########
@@ -155,6 +154,247 @@ impl RecordBatch {
}
}
+/// A collection of scan records grouped by bucket.
+///
+/// Returned by `LogScanner.poll()`. Records are grouped by `TableBucket`.
+#[pyclass]
+pub struct ScanRecords {
+ records_by_bucket: IndexMap<TableBucket, Vec<Py<ScanRecord>>>,
+ total_count: usize,
+}
+
+#[pymethods]
+impl ScanRecords {
+ /// List of distinct buckets that have records in this result.
+ pub fn buckets(&self) -> Vec<TableBucket> {
+ self.records_by_bucket.keys().cloned().collect()
+ }
+
+ /// Get records for a specific bucket.
+ ///
+ /// Returns an empty list if the bucket is not present (matches Rust/Java
behavior).
+ pub fn records(&self, py: Python, bucket: &TableBucket) ->
Vec<Py<ScanRecord>> {
+ self.records_by_bucket
+ .get(bucket)
+ .map(|recs| recs.iter().map(|r| r.clone_ref(py)).collect())
+ .unwrap_or_default()
+ }
+
+ /// Total number of records across all buckets.
+ pub fn count(&self) -> usize {
+ self.total_count
+ }
+
+ /// Whether the result set is empty.
+ pub fn is_empty(&self) -> bool {
+ self.total_count == 0
+ }
+
+ fn __len__(&self) -> usize {
+ self.total_count
+ }
+
+ /// Type-dispatched indexing:
+ /// records[0] → ScanRecord (flat index)
+ /// records[-1] → ScanRecord (negative index)
+ /// records[1:3] → list[ScanRecord] (slice)
+ /// records[bucket] → list[ScanRecord] (by bucket)
+ fn __getitem__(&self, py: Python, key: &Bound<'_, pyo3::PyAny>) ->
PyResult<Py<pyo3::PyAny>> {
+ // Try integer index first
+ if let Ok(mut idx) = key.extract::<isize>() {
+ let len = self.total_count as isize;
+ if idx < 0 {
+ idx += len;
+ }
+ if idx < 0 || idx >= len {
+ return Err(pyo3::exceptions::PyIndexError::new_err(format!(
+ "index {idx} out of range for ScanRecords of size {len}"
+ )));
+ }
+ let idx = idx as usize;
+ let mut offset = 0;
+ for recs in self.records_by_bucket.values() {
+ if idx < offset + recs.len() {
+ return Ok(recs[idx - offset].clone_ref(py).into_any());
+ }
+ offset += recs.len();
+ }
+ return Err(pyo3::exceptions::PyRuntimeError::new_err(
+ "internal error: total_count out of sync with records",
+ ));
+ }
+ // Try slice
+ if let Ok(slice) = key.downcast::<pyo3::types::PySlice>() {
Review Comment:
nit: use import instead of full namespace
Similarly for other classes/parts in the PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]