This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d985b4cad fix RecordBatch size in topK (#13906)
2d985b4cad is described below

commit 2d985b4cad334850d3af24b7c5d3de6a6c305cc6
Author: Namgung Chan <[email protected]>
AuthorDate: Fri Dec 27 00:28:43 2024 +0900

    fix RecordBatch size in topK (#13906)
---
 datafusion/physical-plan/src/topk/mod.rs | 49 +++++++++++++++++++++++++++++---
 1 file changed, 45 insertions(+), 4 deletions(-)

diff --git a/datafusion/physical-plan/src/topk/mod.rs 
b/datafusion/physical-plan/src/topk/mod.rs
index 6d5299f716..ca154925df 100644
--- a/datafusion/physical-plan/src/topk/mod.rs
+++ b/datafusion/physical-plan/src/topk/mod.rs
@@ -24,6 +24,8 @@ use arrow::{
 use std::mem::size_of;
 use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
 
+use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, 
MetricBuilder};
+use crate::spill::get_record_batch_memory_size;
 use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
 use arrow_array::{Array, ArrayRef, RecordBatch};
 use arrow_schema::SchemaRef;
@@ -36,8 +38,6 @@ use datafusion_execution::{
 use datafusion_physical_expr::PhysicalSortExpr;
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
-use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, 
MetricBuilder};
-
 /// Global TopK
 ///
 /// # Background
@@ -575,7 +575,7 @@ impl RecordBatchStore {
     pub fn insert(&mut self, entry: RecordBatchEntry) {
         // uses of 0 means that none of the rows in the batch were stored in 
the topk
         if entry.uses > 0 {
-            self.batches_size += entry.batch.get_array_memory_size();
+            self.batches_size += get_record_batch_memory_size(&entry.batch);
             self.batches.insert(entry.id, entry);
         }
     }
@@ -630,7 +630,7 @@ impl RecordBatchStore {
             let old_entry = self.batches.remove(&id).unwrap();
             self.batches_size = self
                 .batches_size
-                .checked_sub(old_entry.batch.get_array_memory_size())
+                .checked_sub(get_record_batch_memory_size(&old_entry.batch))
                 .unwrap();
         }
     }
@@ -643,3 +643,44 @@ impl RecordBatchStore {
             + self.batches_size
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::Int32Array;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow::record_batch::RecordBatch;
+    use arrow_array::Float64Array;
+
+    /// This test ensures the size calculation is correct for RecordBatches 
with multiple columns.
+    #[test]
+    fn test_record_batch_store_size() {
+        // given
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("ints", DataType::Int32, true),
+            Field::new("float64", DataType::Float64, false),
+        ]));
+        let mut record_batch_store = 
RecordBatchStore::new(Arc::clone(&schema));
+        let int_array =
+            Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), 
Some(5)]); // 5 * 4 = 20
+        let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]); 
// 5 * 8 = 40
+
+        let record_batch_entry = RecordBatchEntry {
+            id: 0,
+            batch: RecordBatch::try_new(
+                schema,
+                vec![Arc::new(int_array), Arc::new(float64_array)],
+            )
+            .unwrap(),
+            uses: 1,
+        };
+
+        // when insert record batch entry
+        record_batch_store.insert(record_batch_entry);
+        assert_eq!(record_batch_store.batches_size, 60);
+
+        // when unuse record batch entry
+        record_batch_store.unuse(0);
+        assert_eq!(record_batch_store.batches_size, 0);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to