Copilot commented on code in PR #36:
URL: https://github.com/apache/sedona-db/pull/36#discussion_r2329304774
##########
rust/sedona/src/record_batch_reader_provider.rs:
##########
@@ -208,12 +266,40 @@ mod test {
use arrow_array::{RecordBatch, RecordBatchIterator};
use arrow_schema::{DataType, Field, Schema};
- use datafusion::prelude::SessionContext;
+ use datafusion::prelude::{DataFrame, SessionContext};
+ use rstest::rstest;
use sedona_schema::datatypes::WKB_GEOMETRY;
use sedona_testing::create::create_array_storage;
use super::*;
+ fn create_test_batch(size: usize, start_id: i32) -> RecordBatch {
+ let schema = Schema::new(vec![Field::new("id", DataType::Int32,
false)]);
+ let ids: Vec<i32> = (start_id..start_id + size as i32).collect();
+ RecordBatch::try_new(
+ Arc::new(schema),
+ vec![Arc::new(arrow_array::Int32Array::from(ids))],
+ )
+ .unwrap()
+ }
+
+ fn create_test_reader(batch_sizes: Vec<usize>) -> Box<dyn
RecordBatchReader + Send> {
+ let mut start_id = 0i32;
+ let batches: Vec<RecordBatch> = batch_sizes
+ .into_iter()
+ .map(|size| {
+ let batch = create_test_batch(size, start_id);
+ start_id += size as i32;
+ batch
+ })
+ .collect();
Review Comment:
Potential panic if `batches` is empty. This could occur if `batch_sizes`
contains only zero values, resulting in an empty batches vector. Consider
checking if batches is empty or ensuring at least one non-zero batch size.
```suggestion
.collect();
if batches.is_empty() {
panic!("create_test_reader: batch_sizes must contain at least
one non-zero value to create a non-empty batches vector");
}
```
##########
rust/sedona/src/record_batch_reader_provider.rs:
##########
@@ -88,26 +88,78 @@ impl TableProvider for RecordBatchReaderProvider {
_filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
- let mut writable_reader = self.reader.try_write().map_err(|_| {
- DataFusionError::Internal("Failed to acquire lock on
RecordBatchReader".to_string())
- })?;
- if let Some(reader) = writable_reader.take() {
+ let mut reader_guard = self.reader.lock();
+ if let Some(reader) = reader_guard.take() {
Ok(Arc::new(RecordBatchReaderExec::new(reader, limit)))
} else {
sedona_internal_err!("Can't scan RecordBatchReader provider more
than once")
}
}
}
+/// An iterator that limits the number of rows from a RecordBatchReader
+struct RowLimitedIterator {
+ reader: Option<Box<dyn RecordBatchReader + Send>>,
+ limit: usize,
+ rows_consumed: usize,
+}
+
+impl RowLimitedIterator {
+ fn new(reader: Box<dyn RecordBatchReader + Send>, limit: usize) -> Self {
+ Self {
+ reader: Some(reader),
+ limit,
+ rows_consumed: 0,
+ }
+ }
+}
+
+impl Iterator for RowLimitedIterator {
+ type Item = Result<arrow_array::RecordBatch>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ // Check if we have already consumed enough rows
+ if self.rows_consumed >= self.limit {
+ self.reader = None;
Review Comment:
Setting `self.reader = None` on line 123 is unnecessary since the iterator
has already reached its limit. The reader will be dropped anyway when the
iterator is dropped, and this early cleanup doesn't provide significant benefit
while adding extra operations.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]