This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e92fc5abf Miscellaneous ArrayData Cleanup (#5612)
e92fc5abf is described below

commit e92fc5abf034c1e3c62e271842fdfca23ea705dd
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Mar 16 15:14:21 2023 +0000

    Miscellaneous ArrayData Cleanup (#5612)
    
    * Cleanup direct uses of ArrayData
    
    * Fix DataFrame::count
    
    * COUNT_STAR_EXPANSION
---
 datafusion/common/src/scalar.rs                     |  2 +-
 .../core/src/avro_to_arrow/arrow_array_reader.rs    |  8 +++-----
 datafusion/core/src/dataframe.rs                    |  6 +++---
 .../physical_plan/file_format/parquet/row_filter.rs |  5 ++---
 datafusion/core/src/physical_plan/joins/utils.rs    | 21 ++++++++++-----------
 datafusion/physical-expr/src/aggregate/count.rs     |  6 +++---
 datafusion/physical-expr/src/aggregate/sum.rs       |  4 ++--
 datafusion/physical-expr/src/expressions/case.rs    |  6 +++---
 datafusion/physical-expr/src/expressions/in_list.rs |  7 +++----
 9 files changed, 30 insertions(+), 35 deletions(-)

diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 5dc042586..73352941a 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -2358,7 +2358,7 @@ impl ScalarValue {
                     None => v.is_null(),
                 }
             }
-            ScalarValue::Null => array.data().is_null(index),
+            ScalarValue::Null => array.is_null(index),
         }
     }
 
diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs 
b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
index 1f06078e4..313c2a159 100644
--- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
@@ -435,7 +435,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
         });
         let valid_len = cur_offset.to_usize().unwrap();
         let array_data = match list_field.data_type() {
-            DataType::Null => NullArray::new(valid_len).data().clone(),
+            DataType::Null => NullArray::new(valid_len).into_data(),
             DataType::Boolean => {
                 let num_bytes = bit_util::ceil(valid_len, 8);
                 let mut bool_values = 
MutableBuffer::from_len_zeroed(num_bytes);
@@ -496,13 +496,11 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
             DataType::Utf8 => flatten_string_values(rows)
                 .into_iter()
                 .collect::<StringArray>()
-                .data()
-                .clone(),
+                .into_data(),
             DataType::LargeUtf8 => flatten_string_values(rows)
                 .into_iter()
                 .collect::<LargeStringArray>()
-                .data()
-                .clone(),
+                .into_data(),
             DataType::List(field) => {
                 let child =
                     self.build_nested_list_array::<i32>(&flatten_values(rows), 
field)?;
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index af41725db..1b8b742c2 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -30,8 +30,8 @@ use parquet::file::properties::WriterProperties;
 use datafusion_common::from_slice::FromSlice;
 use datafusion_common::{Column, DFSchema, ScalarValue};
 use datafusion_expr::{
-    avg, count, is_null, max, median, min, stddev, TableProviderFilterPushDown,
-    UNNAMED_TABLE,
+    avg, count, is_null, max, median, min, stddev, utils::COUNT_STAR_EXPANSION,
+    TableProviderFilterPushDown, UNNAMED_TABLE,
 };
 
 use crate::arrow::datatypes::Schema;
@@ -630,7 +630,7 @@ impl DataFrame {
         let rows = self
             .aggregate(
                 vec![],
-                vec![datafusion_expr::count(Expr::Literal(ScalarValue::Null))],
+                
vec![datafusion_expr::count(Expr::Literal(COUNT_STAR_EXPANSION))],
             )?
             .collect()
             .await?;
diff --git 
a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs 
b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
index 54478edc7..91ae8afe6 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::array::{Array, BooleanArray};
+use arrow::array::BooleanArray;
 use arrow::datatypes::{DataType, Schema};
 use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
@@ -131,8 +131,7 @@ impl ArrowPredicate for DatafusionArrowPredicate {
             .map(|v| v.into_array(batch.num_rows()))
         {
             Ok(array) => {
-                let mask = as_boolean_array(&array)?;
-                let bool_arr = BooleanArray::from(mask.data().clone());
+                let bool_arr = as_boolean_array(&array)?.clone();
                 let num_filtered = bool_arr.len() - bool_arr.true_count();
                 self.rows_filtered.add(num_filtered);
                 timer.stop();
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs 
b/datafusion/core/src/physical_plan/joins/utils.rs
index a756d2ba8..f10ad7a15 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -18,11 +18,11 @@
 //! Join related functionality used both on logical and physical plans
 
 use arrow::array::{
-    new_null_array, Array, BooleanBufferBuilder, PrimitiveArray, UInt32Array,
+    downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
     UInt32Builder, UInt64Array,
 };
 use arrow::compute;
-use arrow::datatypes::{Field, Schema, UInt32Type, UInt64Type};
+use arrow::datatypes::{Field, Schema};
 use arrow::record_batch::{RecordBatch, RecordBatchOptions};
 use futures::future::{BoxFuture, Shared};
 use futures::{ready, FutureExt};
@@ -783,8 +783,8 @@ pub(crate) fn apply_join_filter_to_indices(
         filter.schema(),
         build_input_buffer,
         probe_batch,
-        PrimitiveArray::from(build_indices.data().clone()),
-        PrimitiveArray::from(probe_indices.data().clone()),
+        build_indices.clone(),
+        probe_indices.clone(),
         filter.column_indices(),
         build_side,
     )?;
@@ -794,13 +794,12 @@ pub(crate) fn apply_join_filter_to_indices(
         .into_array(intermediate_batch.num_rows());
     let mask = as_boolean_array(&filter_result)?;
 
-    let left_filtered = PrimitiveArray::<UInt64Type>::from(
-        compute::filter(&build_indices, mask)?.data().clone(),
-    );
-    let right_filtered = PrimitiveArray::<UInt32Type>::from(
-        compute::filter(&probe_indices, mask)?.data().clone(),
-    );
-    Ok((left_filtered, right_filtered))
+    let left_filtered = compute::filter(&build_indices, mask)?;
+    let right_filtered = compute::filter(&probe_indices, mask)?;
+    Ok((
+        downcast_array(left_filtered.as_ref()),
+        downcast_array(right_filtered.as_ref()),
+    ))
 }
 
 /// Returns a new [RecordBatch] by combining the `left` and `right` according 
to `indices`.
diff --git a/datafusion/physical-expr/src/aggregate/count.rs 
b/datafusion/physical-expr/src/aggregate/count.rs
index 38b193ebf..dc77b794a 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -137,13 +137,13 @@ impl Accumulator for CountAccumulator {
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let array = &values[0];
-        self.count += (array.len() - array.data().null_count()) as i64;
+        self.count += (array.len() - array.null_count()) as i64;
         Ok(())
     }
 
     fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let array = &values[0];
-        self.count -= (array.len() - array.data().null_count()) as i64;
+        self.count -= (array.len() - array.null_count()) as i64;
         Ok(())
     }
 
@@ -183,7 +183,7 @@ impl RowAccumulator for CountRowAccumulator {
         accessor: &mut RowAccessor,
     ) -> Result<()> {
         let array = &values[0];
-        let delta = (array.len() - array.data().null_count()) as u64;
+        let delta = (array.len() - array.null_count()) as u64;
         accessor.add_u64(self.state_index, delta);
         Ok(())
     }
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs 
b/datafusion/physical-expr/src/aggregate/sum.rs
index 0a21de22d..a815a33c8 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -236,7 +236,7 @@ impl Accumulator for SumAccumulator {
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let values = &values[0];
-        self.count += (values.len() - values.data().null_count()) as u64;
+        self.count += (values.len() - values.null_count()) as u64;
         let delta = sum_batch(values, &self.sum.get_datatype())?;
         self.sum = self.sum.add(&delta)?;
         Ok(())
@@ -244,7 +244,7 @@ impl Accumulator for SumAccumulator {
 
     fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         let values = &values[0];
-        self.count -= (values.len() - values.data().null_count()) as u64;
+        self.count -= (values.len() - values.null_count()) as u64;
         let delta = sum_batch(values, &self.sum.get_datatype())?;
         self.sum = self.sum.sub(&delta)?;
         Ok(())
diff --git a/datafusion/physical-expr/src/expressions/case.rs 
b/datafusion/physical-expr/src/expressions/case.rs
index 3dff4ae97..d64947445 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -716,10 +716,10 @@ mod tests {
 
         //let valid_array = vec![true, false, false, true, false, tru
         let null_buffer = Buffer::from([0b00101001u8]);
-        let load4 = ArrayDataBuilder::new(load4.data_type().clone())
-            .len(load4.len())
+        let load4 = load4
+            .into_data()
+            .into_builder()
             .null_bit_buffer(Some(null_buffer))
-            .buffers(load4.data().buffers().to_vec())
             .build()
             .unwrap();
         let load4: Float64Array = load4.into();
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs 
b/datafusion/physical-expr/src/expressions/in_list.rs
index 3a5a25ff6..e37d5cd74 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -86,7 +86,7 @@ where
 {
     fn new(array: &T, hash_set: ArrayHashSet) -> Self {
         Self {
-            array: T::from(array.data().clone()),
+            array: downcast_array(array),
             hash_set,
         }
     }
@@ -103,15 +103,14 @@ where
             v => {
                 let values_contains = self.contains(v.values().as_ref(), 
negated)?;
                 let result = take(&values_contains, v.keys(), None)?;
-                return Ok(BooleanArray::from(result.data().clone()))
+                return Ok(downcast_array(result.as_ref()))
             }
             _ => {}
         }
 
         let v = v.as_any().downcast_ref::<T>().unwrap();
-        let in_data = self.array.data();
         let in_array = &self.array;
-        let has_nulls = in_data.null_count() != 0;
+        let has_nulls = in_array.null_count() != 0;
 
         Ok(ArrayIter::new(v)
             .map(|v| {

Reply via email to