This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new df21442  fix(rust/sedona,python/sedonadb): Ensure empty batches are 
not included in RecordBatchReader output (#207)
df21442 is described below

commit df214426366a07e2bc1c46295672c8110b1511c9
Author: Dewey Dunnington <[email protected]>
AuthorDate: Mon Oct 13 03:05:38 2025 -0500

    fix(rust/sedona,python/sedonadb): Ensure empty batches are not included in 
RecordBatchReader output (#207)
---
 python/sedonadb/src/reader.rs           | 21 ++++++++++-----
 python/sedonadb/tests/test_dataframe.py | 36 +++++++++++++++++++++++--
 rust/sedona/src/reader.rs               | 47 ++++++++++++++++++++++++++++++---
 3 files changed, 93 insertions(+), 11 deletions(-)

diff --git a/python/sedonadb/src/reader.rs b/python/sedonadb/src/reader.rs
index b00333a..4c13e7d 100644
--- a/python/sedonadb/src/reader.rs
+++ b/python/sedonadb/src/reader.rs
@@ -43,12 +43,21 @@ impl Iterator for PySedonaStreamReader {
     type Item = std::result::Result<RecordBatch, ArrowError>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        match wait_for_future_from_rust(&self.runtime, self.stream.try_next()) 
{
-            Ok(maybe_batch) => match maybe_batch {
-                Ok(maybe_batch) => maybe_batch.map(Ok),
-                Err(err) => 
Some(Err(ArrowError::ExternalError(Box::new(err)))),
-            },
-            Err(err) => Some(Err(ArrowError::ExternalError(Box::new(err)))),
+        loop {
+            match wait_for_future_from_rust(&self.runtime, 
self.stream.try_next()) {
+                Ok(Ok(maybe_batch)) => match maybe_batch {
+                    Some(batch) => {
+                        if batch.num_rows() == 0 {
+                            continue;
+                        }
+
+                        return Some(Ok(batch));
+                    }
+                    None => return None,
+                },
+                Ok(Err(df_err)) => return 
Some(Err(ArrowError::ExternalError(Box::new(df_err)))),
+                Err(py_err) => return 
Some(Err(ArrowError::ExternalError(Box::new(py_err)))),
+            }
         }
     }
 }
diff --git a/python/sedonadb/tests/test_dataframe.py 
b/python/sedonadb/tests/test_dataframe.py
index 60ac995..8d488df 100644
--- a/python/sedonadb/tests/test_dataframe.py
+++ b/python/sedonadb/tests/test_dataframe.py
@@ -14,15 +14,18 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import tempfile
+from pathlib import Path
+
 import geoarrow.pyarrow as ga
 import geoarrow.types as gat
 import geopandas.testing
 import pandas as pd
-from pathlib import Path
 import pyarrow as pa
 import pytest
 import sedonadb
-import tempfile
+from sedonadb.testing import skip_if_not_exists
 
 
 def test_dataframe_from_dataframe(con):
@@ -255,6 +258,35 @@ def test_dataframe_to_arrow(con):
         df.to_arrow_table(schema=pa.schema({}))
 
 
+def test_dataframe_to_arrow_empty_batches(con, geoarrow_data):
+    # It's difficult to trigger this with a simpler example
+    # https://github.com/apache/sedona-db/issues/156
+    path_water_junc = (
+        geoarrow_data / "ns-water" / "files" / 
"ns-water_water-junc_geo.parquet"
+    )
+    path_water_point = (
+        geoarrow_data / "ns-water" / "files" / 
"ns-water_water-point_geo.parquet"
+    )
+    skip_if_not_exists(path_water_junc)
+    skip_if_not_exists(path_water_point)
+
+    con.read_parquet(path_water_junc).to_view("junc", overwrite=True)
+    con.read_parquet(path_water_point).to_view("point", overwrite=True)
+    con.sql("""SELECT geometry FROM junc WHERE "OBJECTID" = 1814""").to_view(
+        "junc_filter", overwrite=True
+    )
+
+    joined = con.sql("""
+        SELECT "OBJECTID", "FEAT_CODE", point.geometry
+        FROM point
+        JOIN junc_filter ON ST_DWithin(junc_filter.geometry, point.geometry, 
10000)
+    """)
+
+    reader = pa.RecordBatchReader.from_stream(joined)
+    batch_rows = [len(batch) for batch in reader]
+    assert batch_rows == [24]
+
+
 def test_dataframe_to_pandas(con):
     # Check with a geometry column
     df_with_geo = con.sql("SELECT 1 as one, ST_GeomFromWKT('POINT (0 1)') as 
geom")
diff --git a/rust/sedona/src/reader.rs b/rust/sedona/src/reader.rs
index 374f9e0..2b4673f 100644
--- a/rust/sedona/src/reader.rs
+++ b/rust/sedona/src/reader.rs
@@ -41,9 +41,20 @@ impl Iterator for SedonaStreamReader {
     type Item = std::result::Result<RecordBatch, ArrowError>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        match self.runtime.block_on(self.stream.try_next()) {
-            Ok(maybe_batch) => maybe_batch.map(Ok),
-            Err(err) => Some(Err(ArrowError::ExternalError(Box::new(err)))),
+        loop {
+            match self.runtime.block_on(self.stream.try_next()) {
+                Ok(maybe_batch) => match maybe_batch {
+                    Some(batch) => {
+                        if batch.num_rows() == 0 {
+                            continue;
+                        }
+
+                        return Some(Ok(batch));
+                    }
+                    None => return None,
+                },
+                Err(err) => return 
Some(Err(ArrowError::ExternalError(Box::new(err)))),
+            }
         }
     }
 }
@@ -57,7 +68,9 @@ impl RecordBatchReader for SedonaStreamReader {
 #[cfg(test)]
 mod test {
 
+    use arrow_array::record_batch;
     use arrow_schema::{DataType, Field, Schema};
+    use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 
     use crate::context::SedonaContext;
 
@@ -82,4 +95,32 @@ mod test {
         assert_eq!(reader.next().unwrap().unwrap(), expected_batches[0]);
         assert!(reader.next().is_none());
     }
+
+    #[test]
+    fn reader_empty_chunks() {
+        let runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
+
+        let batch0 = record_batch!(
+            ("a", Int32, [1, 2, 3]),
+            ("b", Float64, [Some(4.0), None, Some(5.0)])
+        )
+        .expect("created batch");
+        let schema = batch0.schema();
+
+        let batch1 = RecordBatch::new_empty(schema.clone());
+        let batch2 = batch0.clone();
+
+        let stream = futures::stream::iter(vec![
+            Ok(batch0.clone()),
+            Ok(batch1.clone()),
+            Ok(batch2.clone()),
+        ]);
+        let adapter = RecordBatchStreamAdapter::new(schema, stream);
+        let batch_stream: SendableRecordBatchStream = Box::pin(adapter);
+
+        let mut reader = SedonaStreamReader::new(runtime, batch_stream);
+        assert_eq!(reader.next().unwrap().unwrap(), batch0);
+        assert_eq!(reader.next().unwrap().unwrap(), batch2);
+        assert!(reader.next().is_none());
+    }
 }

Reply via email to