This is an automated email from the ASF dual-hosted git repository.
kontinuation pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 01f56a9e chore(rust/sedona-spatial-join): More accurate batch
in-memory size estimation (#515)
01f56a9e is described below
commit 01f56a9e3c647c7bab9c0b868757f5ec7e11558f
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Fri Jan 16 10:03:57 2026 +0800
chore(rust/sedona-spatial-join): More accurate batch in-memory size
estimation (#515)
This patch improves the accuracy of memory usage estimation by implementing
our own functions for estimating the in-memory sizes of record batches and
arrow arrays.
The rationale is similar to
https://github.com/apache/datafusion/pull/13377. If we don't roll our own
memory usage estimation function but call `RecordBatch::get_array_memory_size`
instead, we'll get insanely inaccurate numbers for spilled batches read using
`arrow::ipc::reader::StreamReader`.
Future work: use the memory pool API of arrow-rs for more accurate memory
usage accounting. See https://github.com/apache/arrow-rs/issues/8137.
Co-authored-by: Dewey Dunnington <[email protected]>
---
rust/sedona-spatial-join/src/evaluated_batch.rs | 11 +-
.../src/index/build_side_collector.rs | 2 +-
.../sedona-spatial-join/src/index/spatial_index.rs | 20 +-
.../src/index/spatial_index_builder.rs | 7 +-
rust/sedona-spatial-join/src/operand_evaluator.rs | 16 +-
rust/sedona-spatial-join/src/utils.rs | 1 +
rust/sedona-spatial-join/src/utils/arrow_utils.rs | 291 +++++++++++++++++++++
7 files changed, 323 insertions(+), 25 deletions(-)
diff --git a/rust/sedona-spatial-join/src/evaluated_batch.rs
b/rust/sedona-spatial-join/src/evaluated_batch.rs
index ff67f171..d44d49ec 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch.rs
@@ -16,11 +16,14 @@
// under the License.
use arrow_array::RecordBatch;
+use datafusion_common::Result;
use datafusion_expr::ColumnarValue;
use geo::Rect;
use wkb::reader::Wkb;
-use crate::operand_evaluator::EvaluatedGeometryArray;
+use crate::{
+ operand_evaluator::EvaluatedGeometryArray,
utils::arrow_utils::get_record_batch_memory_size,
+};
/// EvaluatedBatch contains the original record batch from the input stream
and the evaluated
/// geometry array.
@@ -34,12 +37,14 @@ pub(crate) struct EvaluatedBatch {
}
impl EvaluatedBatch {
- pub fn in_mem_size(&self) -> usize {
+ pub fn in_mem_size(&self) -> Result<usize> {
// NOTE: sometimes `geom_array` will reuse the memory of `batch`,
especially when
// the expression for evaluating the geometry is a simple column
reference. In this case,
// the in_mem_size will be overestimated. It is a conservative
estimation so there's no risk
// of running out of memory because of underestimation.
- self.batch.get_array_memory_size() + self.geom_array.in_mem_size()
+ let record_batch_size = get_record_batch_memory_size(&self.batch)?;
+ let geom_array_size = self.geom_array.in_mem_size()?;
+ Ok(record_batch_size + geom_array_size)
}
pub fn num_rows(&self) -> usize {
diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs
b/rust/sedona-spatial-join/src/index/build_side_collector.rs
index 6fafa441..ab3e8e27 100644
--- a/rust/sedona-spatial-join/src/index/build_side_collector.rs
+++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs
@@ -112,7 +112,7 @@ impl BuildSideBatchesCollector {
geom_array,
};
- let in_mem_size = build_side_batch.in_mem_size();
+ let in_mem_size = build_side_batch.in_mem_size()?;
metrics.num_batches.add(1);
metrics.num_rows.add(build_side_batch.num_rows());
metrics.total_size_bytes.add(in_mem_size);
diff --git a/rust/sedona-spatial-join/src/index/spatial_index.rs
b/rust/sedona-spatial-join/src/index/spatial_index.rs
index 64f2cce7..83a1a754 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index.rs
@@ -600,7 +600,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch,
&WKB_GEOMETRY).unwrap(),
};
- builder.add_batch(indexed_batch);
+ builder.add_batch(indexed_batch).unwrap();
let index = builder.finish().unwrap();
assert_eq!(index.schema(), schema);
@@ -663,7 +663,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch,
&WKB_GEOMETRY).unwrap(),
};
- builder.add_batch(indexed_batch);
+ builder.add_batch(indexed_batch).unwrap();
let index = builder.finish().unwrap();
@@ -764,7 +764,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch,
&WKB_GEOMETRY).unwrap(),
};
- builder.add_batch(indexed_batch);
+ builder.add_batch(indexed_batch).unwrap();
let index = builder.finish().unwrap();
@@ -850,7 +850,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch,
&WKB_GEOMETRY).unwrap(),
};
- builder.add_batch(indexed_batch);
+ builder.add_batch(indexed_batch).unwrap();
let index = builder.finish().unwrap();
@@ -946,7 +946,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch,
&WKB_GEOMETRY).unwrap(),
};
- builder.add_batch(indexed_batch);
+ builder.add_batch(indexed_batch).unwrap();
let index = builder.finish().unwrap();
@@ -1092,7 +1092,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch,
&WKB_GEOMETRY).unwrap(),
};
- builder.add_batch(indexed_batch);
+ builder.add_batch(indexed_batch).unwrap();
let index = builder.finish().unwrap();
@@ -1206,7 +1206,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch,
&WKB_GEOMETRY).unwrap(),
};
- builder.add_batch(indexed_batch);
+ builder.add_batch(indexed_batch).unwrap();
let index = builder.finish().unwrap();
@@ -1291,7 +1291,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch,
&WKB_GEOMETRY).unwrap(),
};
- builder.add_batch(indexed_batch);
+ builder.add_batch(indexed_batch).unwrap();
let index = builder.finish().unwrap();
@@ -1377,7 +1377,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch,
&WKB_GEOMETRY).unwrap(),
};
- builder.add_batch(indexed_batch);
+ builder.add_batch(indexed_batch).unwrap();
let index = builder.finish().unwrap();
@@ -1493,7 +1493,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch,
&WKB_GEOMETRY).unwrap(),
};
- builder.add_batch(indexed_batch);
+ builder.add_batch(indexed_batch).unwrap();
let index = builder.finish().unwrap();
diff --git a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
index abbe60a3..41d7fbd6 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
@@ -129,11 +129,12 @@ impl SpatialIndexBuilder {
///
/// This method accumulates geometry batches that will be used to build
the spatial index.
/// Each batch contains processed geometry data along with memory usage
information.
- pub fn add_batch(&mut self, indexed_batch: EvaluatedBatch) {
- let in_mem_size = indexed_batch.in_mem_size();
+ pub fn add_batch(&mut self, indexed_batch: EvaluatedBatch) -> Result<()> {
+ let in_mem_size = indexed_batch.in_mem_size()?;
self.indexed_batches.push(indexed_batch);
self.reservation.grow(in_mem_size);
self.metrics.build_mem_used.add(in_mem_size);
+ Ok(())
}
pub fn merge_stats(&mut self, stats: GeoStatistics) -> &mut Self {
@@ -298,7 +299,7 @@ impl SpatialIndexBuilder {
let mut stream = partition.build_side_batch_stream;
while let Some(batch) = stream.next().await {
let indexed_batch = batch?;
- self.add_batch(indexed_batch);
+ self.add_batch(indexed_batch)?;
}
self.merge_stats(partition.geo_statistics);
let mem_bytes = partition.reservation.free();
diff --git a/rust/sedona-spatial-join/src/operand_evaluator.rs
b/rust/sedona-spatial-join/src/operand_evaluator.rs
index b665ffea..b76e9116 100644
--- a/rust/sedona-spatial-join/src/operand_evaluator.rs
+++ b/rust/sedona-spatial-join/src/operand_evaluator.rs
@@ -34,8 +34,9 @@ use wkb::reader::Wkb;
use sedona_common::option::SpatialJoinOptions;
-use crate::spatial_predicate::{
- DistancePredicate, KNNPredicate, RelationPredicate, SpatialPredicate,
+use crate::{
+ spatial_predicate::{DistancePredicate, KNNPredicate, RelationPredicate,
SpatialPredicate},
+ utils::arrow_utils::get_array_memory_size,
};
/// Operand evaluator is for evaluating the operands of a spatial predicate.
It can be a distance
@@ -154,9 +155,11 @@ impl EvaluatedGeometryArray {
&self.wkbs
}
- pub fn in_mem_size(&self) -> usize {
+ pub fn in_mem_size(&self) -> Result<usize> {
+ let geom_array_size = get_array_memory_size(&self.geometry_array)?;
+
let distance_in_mem_size = match &self.distance {
- Some(ColumnarValue::Array(array)) => array.get_array_memory_size(),
+ Some(ColumnarValue::Array(array)) => get_array_memory_size(array)?,
_ => 8,
};
@@ -164,10 +167,7 @@ impl EvaluatedGeometryArray {
// should be small, so the inaccuracy does not matter too much.
let wkb_vec_size = self.wkbs.allocated_size();
- self.geometry_array.get_array_memory_size()
- + self.rects.allocated_size()
- + distance_in_mem_size
- + wkb_vec_size
+ Ok(geom_array_size + self.rects.allocated_size() +
distance_in_mem_size + wkb_vec_size)
}
}
diff --git a/rust/sedona-spatial-join/src/utils.rs
b/rust/sedona-spatial-join/src/utils.rs
index 2f7a55a5..6db8f85a 100644
--- a/rust/sedona-spatial-join/src/utils.rs
+++ b/rust/sedona-spatial-join/src/utils.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+pub(crate) mod arrow_utils;
pub(crate) mod bbox_sampler;
pub(crate) mod concurrent_reservation;
pub(crate) mod init_once_array;
diff --git a/rust/sedona-spatial-join/src/utils/arrow_utils.rs
b/rust/sedona-spatial-join/src/utils/arrow_utils.rs
new file mode 100644
index 00000000..c8c5779b
--- /dev/null
+++ b/rust/sedona-spatial-join/src/utils/arrow_utils.rs
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{Array, ArrayData, RecordBatch};
+use arrow_array::ArrayRef;
+use arrow_schema::{ArrowError, DataType};
+use datafusion_common::Result;
+
+/// Estimate the in-memory size of a given RecordBatch. This function
estimates the
+/// size as if the underlying buffers were copied to somewhere else and not
shared.
+pub(crate) fn get_record_batch_memory_size(batch: &RecordBatch) ->
Result<usize> {
+ let mut total_size = 0;
+
+ for array in batch.columns() {
+ let array_data = array.to_data();
+ total_size += get_array_data_memory_size(&array_data)?;
+ }
+
+ Ok(total_size)
+}
+
+/// Estimate the in-memory size of a given Arrow array. This function
estimates the
+/// size as if the underlying buffers were copied to somewhere else and not
shared,
+/// including the sizes of each BinaryView item (which is otherwise not
counted by
+/// `array_data.get_slice_memory_size()`).
+pub(crate) fn get_array_memory_size(array: &ArrayRef) -> Result<usize> {
+ let array_data = array.to_data();
+ let size = get_array_data_memory_size(&array_data)?;
+ Ok(size)
+}
+
+/// The maximum number of bytes that can be stored inline in a byte view.
+///
+/// See [`ByteView`] and [`GenericByteViewArray`] for more information on the
+/// layout of the views.
+///
+/// [`GenericByteViewArray`]:
https://docs.rs/arrow/latest/arrow/array/struct.GenericByteViewArray.html
+pub const MAX_INLINE_VIEW_LEN: u32 = 12;
+
+/// Compute the memory usage of `array_data` and its children recursively.
+fn get_array_data_memory_size(array_data: &ArrayData) ->
core::result::Result<usize, ArrowError> {
+ // The `ArrayData::get_slice_memory_size` method does not account for the
memory used by
+ // the values of BinaryView/Utf8View arrays, so we need to compute that
using
+ // `get_binary_view_value_size` and add that to the total size.
+ Ok(get_binary_view_value_size(array_data)? +
array_data.get_slice_memory_size()?)
+}
+
+fn get_binary_view_value_size(array_data: &ArrayData) -> Result<usize,
ArrowError> {
+ let mut result: usize = 0;
+ let array_data_type = array_data.data_type();
+
+ if matches!(array_data_type, DataType::BinaryView | DataType::Utf8View) {
+ // The views buffer contains length view structures with the following
layout:
+ //
https://arrow.apache.org/docs/format/Columnar.html#variable-size-binary-view-layout
+ //
+ // * Short strings, length <= 12
+ // | Bytes 0-3 | Bytes 4-15 |
+ // |------------|---------------------------------------|
+ // | length | data (padded with 0) |
+ //
+ // * Long strings, length > 12
+ // | Bytes 0-3 | Bytes 4-7 | Bytes 8-11 | Bytes 12-15 |
+ // |------------|------------|------------|-------------|
+ // | length | prefix | buf. index | offset |
+ let views = &array_data.buffer::<u128>(0)[..array_data.len()];
+ result = views
+ .iter()
+ .map(|v| {
+ let len = *v as u32;
+ if len > MAX_INLINE_VIEW_LEN {
+ len as usize
+ } else {
+ 0
+ }
+ })
+ .sum();
+ }
+
+ // If this was not a BinaryView/Utf8View array, count the bytes of any
BinaryView/Utf8View
+ // children, taking into account the slice of this array that applies to
the child.
+ for child in array_data.child_data() {
+ result += get_binary_view_value_size(child)?;
+ }
+ Ok(result)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_array::builder::{BinaryViewBuilder, ListBuilder};
+ use arrow_array::types::Int32Type;
+ use arrow_array::{BinaryViewArray, ListArray, StringViewArray,
StructArray};
+ use arrow_schema::{DataType, Field};
+ use std::sync::Arc;
+
+ #[test]
+ fn test_string_view_array_memory_size() {
+ let array = StringViewArray::from(vec![
+ "short", // Inline
+ "Long string that is definitely longer than 12 bytes", // 51 bytes
+ ]);
+ let array_ref: ArrayRef = Arc::new(array);
+ let size = get_array_memory_size(&array_ref).unwrap();
+ // Views: 2 * 16 = 32 bytes
+ // Data: 51 bytes
+ // Total: 83 bytes
+ assert_eq!(size, 83);
+ }
+
+ #[test]
+ fn test_binary_view_array_memory_size() {
+ let array = BinaryViewArray::from(vec![
+ "short".as_bytes(),
+ "Long string that is definitely longer than 12 bytes".as_bytes(),
+ ]);
+ let array_ref: ArrayRef = Arc::new(array);
+ let size = get_array_memory_size(&array_ref).unwrap();
+ assert_eq!(size, 83);
+ }
+
+ #[test]
+ fn test_struct_array_with_view_memory_size() {
+ let string_view_array = StringViewArray::from(vec![
+ "short",
+ "Long string that is definitely longer than 12 bytes",
+ ]);
+ let boolean_array = arrow_array::BooleanArray::from(vec![true, false]);
+
+ let struct_array = StructArray::from(vec![
+ (
+ Arc::new(Field::new("a", DataType::Utf8View, false)),
+ Arc::new(string_view_array) as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("b", DataType::Boolean, false)),
+ Arc::new(boolean_array) as ArrayRef,
+ ),
+ ]);
+
+ let array_ref: ArrayRef = Arc::new(struct_array);
+ let size = get_array_memory_size(&array_ref).unwrap();
+ // 83 (StringView) + 1 (Boolean values) = 84
+ assert_eq!(size, 84);
+
+ let size = get_array_memory_size(&array_ref.slice(0, 1)).unwrap();
+ // 16 (StringView for one short element) + 1 (Boolean values) = 17
+ assert_eq!(size, 17);
+
+ let size = get_array_memory_size(&array_ref.slice(1, 1)).unwrap();
+ // 67 (StringView for one long element) + 1 (Boolean values) = 68
+ assert_eq!(size, 68);
+ }
+
+ #[test]
+ fn test_sliced_view_array_memory_size() {
+ let array = StringViewArray::from(vec![
+ "short",
+ "Long string that is definitely longer than 12 bytes",
+ "Another long string to make buffer larger",
+ ]);
+ let sliced = array.slice(0, 2);
+ let sliced_ref: ArrayRef = Arc::new(sliced);
+ let size = get_array_memory_size(&sliced_ref).unwrap();
+ // Views: 2 * 16 = 32
+ // Data used: 51 ("Long string...")
+ // Total: 83
+ assert_eq!(size, 83);
+
+ let size = get_array_memory_size(&sliced_ref.slice(1, 1)).unwrap();
+ // Views: 1 * 16 = 16
+ // Data used: 51 ("Long string...")
+ // Total: 67
+ assert_eq!(size, 67);
+
+ // Empty slice
+ let size = get_array_memory_size(&sliced_ref.slice(2, 0)).unwrap();
+ assert_eq!(size, 0);
+ }
+
+ fn build_struct_with_list_of_view_and_list_of_i32(
+ ) -> (ArrayRef, &'static [u8], &'static [u8], &'static [u8]) {
+ let short: &'static [u8] = b"short";
+ let long1: &'static [u8] = b"Long string that is definitely longer
than 12 bytes";
+ let long2: &'static [u8] = b"Another long string to make buffer
larger";
+
+ // Build List<BinaryView> with two list items:
+ // 0: [short, long1]
+ // 1: [long2]
+ let mut bv_list_builder = ListBuilder::new(BinaryViewBuilder::new());
+ bv_list_builder.values().append_value(short);
+ bv_list_builder.values().append_value(long1);
+ bv_list_builder.append(true);
+ bv_list_builder.values().append_value(long2);
+ bv_list_builder.append(true);
+ let bv_list: ListArray = bv_list_builder.finish();
+ let bv_list_data_type = bv_list.data_type().clone();
+
+ // Build List<Int32> with two list items:
+ // 0: [1, 2, 3]
+ // 1: [4]
+ let i32_list: ListArray = ListArray::from_iter_primitive::<Int32Type,
_, _>([
+ Some(vec![Some(1), Some(2), Some(3)]),
+ Some(vec![Some(4)]),
+ ]);
+ let i32_list_data_type = i32_list.data_type().clone();
+
+ let struct_array = StructArray::from(vec![
+ (
+ Arc::new(Field::new("bv_list", bv_list_data_type, false)),
+ Arc::new(bv_list) as ArrayRef,
+ ),
+ (
+ Arc::new(Field::new("i32_list", i32_list_data_type, false)),
+ Arc::new(i32_list) as ArrayRef,
+ ),
+ ]);
+
+ (Arc::new(struct_array) as ArrayRef, short, long1, long2)
+ }
+
+ #[test]
+ fn test_struct_array_with_list_of_view_and_list_of_i32_memory_size() {
+ const VIEW_BYTES: usize = 16; // BinaryView/Utf8View: one u128 per
value
+ const OFFSET_BYTES: usize = 4; // ListArray offsets are i32
+ const I32_BYTES: usize = 4;
+
+ let (struct_ref, _short, long1, long2) =
build_struct_with_list_of_view_and_list_of_i32();
+
+ // Full array expected size:
+ // - bv list offsets: 2 * 4
+ // - bv views: 3 * 16
+ // - bv long bytes: long1 + long2 (short is inline)
+ // - i32 list offsets: 2 * 4
+ // - i32 values: 4 * 4
+ let expected_bv_full = 2 * OFFSET_BYTES + 3 * VIEW_BYTES + long1.len()
+ long2.len();
+ let expected_i32_full = 2 * OFFSET_BYTES + 4 * I32_BYTES;
+ assert_eq!(
+ get_array_memory_size(&struct_ref).unwrap(),
+ expected_bv_full + expected_i32_full
+ );
+ }
+
+ #[test]
+ #[ignore = "XFAIL: get_array_memory_size slice accounting for ListArray is
incorrect/fragile"]
+ fn
test_struct_array_with_list_of_view_and_list_of_i32_memory_size_slices_xfail() {
+ const VIEW_BYTES: usize = 16; // BinaryView/Utf8View: one u128 per
value
+ const OFFSET_BYTES: usize = 4; // ListArray offsets are i32
+ const I32_BYTES: usize = 4;
+
+ let (struct_ref, _short, long1, long2) =
build_struct_with_list_of_view_and_list_of_i32();
+
+ // Slice: first struct row only
+ let slice0 = struct_ref.slice(0, 1);
+ let expected_bv_slice0 = OFFSET_BYTES + 2 * VIEW_BYTES + long1.len();
+ let expected_i32_slice0 = OFFSET_BYTES + 3 * I32_BYTES;
+ assert_eq!(
+ get_array_memory_size(&slice0).unwrap(),
+ expected_bv_slice0 + expected_i32_slice0
+ );
+
+ // Slice: second struct row only
+ let slice1 = struct_ref.slice(1, 1);
+ let expected_bv_slice1 = OFFSET_BYTES + VIEW_BYTES + long2.len();
+ let expected_i32_slice1 = OFFSET_BYTES + I32_BYTES;
+ assert_eq!(
+ get_array_memory_size(&slice1).unwrap(),
+ expected_bv_slice1 + expected_i32_slice1
+ );
+
+ // Double slice should behave the same as slice(1, 1)
+ let double_slice = struct_ref.slice(0, 2).slice(1, 1);
+ assert_eq!(
+ get_array_memory_size(&double_slice).unwrap(),
+ expected_bv_slice1 + expected_i32_slice1
+ );
+ }
+}