This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e92fc5abf Miscellaneous ArrayData Cleanup (#5612)
e92fc5abf is described below
commit e92fc5abf034c1e3c62e271842fdfca23ea705dd
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Mar 16 15:14:21 2023 +0000
Miscellaneous ArrayData Cleanup (#5612)
* Cleanup direct uses of ArrayData
* Fix DataFrame::count
* COUNT_STAR_EXPANSION
---
datafusion/common/src/scalar.rs | 2 +-
.../core/src/avro_to_arrow/arrow_array_reader.rs | 8 +++-----
datafusion/core/src/dataframe.rs | 6 +++---
.../physical_plan/file_format/parquet/row_filter.rs | 5 ++---
datafusion/core/src/physical_plan/joins/utils.rs | 21 ++++++++++-----------
datafusion/physical-expr/src/aggregate/count.rs | 6 +++---
datafusion/physical-expr/src/aggregate/sum.rs | 4 ++--
datafusion/physical-expr/src/expressions/case.rs | 6 +++---
datafusion/physical-expr/src/expressions/in_list.rs | 7 +++----
9 files changed, 30 insertions(+), 35 deletions(-)
diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs
index 5dc042586..73352941a 100644
--- a/datafusion/common/src/scalar.rs
+++ b/datafusion/common/src/scalar.rs
@@ -2358,7 +2358,7 @@ impl ScalarValue {
None => v.is_null(),
}
}
- ScalarValue::Null => array.data().is_null(index),
+ ScalarValue::Null => array.is_null(index),
}
}
diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
index 1f06078e4..313c2a159 100644
--- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
+++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
@@ -435,7 +435,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
});
let valid_len = cur_offset.to_usize().unwrap();
let array_data = match list_field.data_type() {
- DataType::Null => NullArray::new(valid_len).data().clone(),
+ DataType::Null => NullArray::new(valid_len).into_data(),
DataType::Boolean => {
let num_bytes = bit_util::ceil(valid_len, 8);
let mut bool_values =
MutableBuffer::from_len_zeroed(num_bytes);
@@ -496,13 +496,11 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
DataType::Utf8 => flatten_string_values(rows)
.into_iter()
.collect::<StringArray>()
- .data()
- .clone(),
+ .into_data(),
DataType::LargeUtf8 => flatten_string_values(rows)
.into_iter()
.collect::<LargeStringArray>()
- .data()
- .clone(),
+ .into_data(),
DataType::List(field) => {
let child =
self.build_nested_list_array::<i32>(&flatten_values(rows),
field)?;
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index af41725db..1b8b742c2 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -30,8 +30,8 @@ use parquet::file::properties::WriterProperties;
use datafusion_common::from_slice::FromSlice;
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::{
- avg, count, is_null, max, median, min, stddev, TableProviderFilterPushDown,
- UNNAMED_TABLE,
+ avg, count, is_null, max, median, min, stddev, utils::COUNT_STAR_EXPANSION,
+ TableProviderFilterPushDown, UNNAMED_TABLE,
};
use crate::arrow::datatypes::Schema;
@@ -630,7 +630,7 @@ impl DataFrame {
let rows = self
.aggregate(
vec![],
- vec![datafusion_expr::count(Expr::Literal(ScalarValue::Null))],
+
vec![datafusion_expr::count(Expr::Literal(COUNT_STAR_EXPANSION))],
)?
.collect()
.await?;
diff --git
a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
index 54478edc7..91ae8afe6 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-use arrow::array::{Array, BooleanArray};
+use arrow::array::BooleanArray;
use arrow::datatypes::{DataType, Schema};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
@@ -131,8 +131,7 @@ impl ArrowPredicate for DatafusionArrowPredicate {
.map(|v| v.into_array(batch.num_rows()))
{
Ok(array) => {
- let mask = as_boolean_array(&array)?;
- let bool_arr = BooleanArray::from(mask.data().clone());
+ let bool_arr = as_boolean_array(&array)?.clone();
let num_filtered = bool_arr.len() - bool_arr.true_count();
self.rows_filtered.add(num_filtered);
timer.stop();
diff --git a/datafusion/core/src/physical_plan/joins/utils.rs
b/datafusion/core/src/physical_plan/joins/utils.rs
index a756d2ba8..f10ad7a15 100644
--- a/datafusion/core/src/physical_plan/joins/utils.rs
+++ b/datafusion/core/src/physical_plan/joins/utils.rs
@@ -18,11 +18,11 @@
//! Join related functionality used both on logical and physical plans
use arrow::array::{
- new_null_array, Array, BooleanBufferBuilder, PrimitiveArray, UInt32Array,
+ downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
UInt32Builder, UInt64Array,
};
use arrow::compute;
-use arrow::datatypes::{Field, Schema, UInt32Type, UInt64Type};
+use arrow::datatypes::{Field, Schema};
use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use futures::future::{BoxFuture, Shared};
use futures::{ready, FutureExt};
@@ -783,8 +783,8 @@ pub(crate) fn apply_join_filter_to_indices(
filter.schema(),
build_input_buffer,
probe_batch,
- PrimitiveArray::from(build_indices.data().clone()),
- PrimitiveArray::from(probe_indices.data().clone()),
+ build_indices.clone(),
+ probe_indices.clone(),
filter.column_indices(),
build_side,
)?;
@@ -794,13 +794,12 @@ pub(crate) fn apply_join_filter_to_indices(
.into_array(intermediate_batch.num_rows());
let mask = as_boolean_array(&filter_result)?;
- let left_filtered = PrimitiveArray::<UInt64Type>::from(
- compute::filter(&build_indices, mask)?.data().clone(),
- );
- let right_filtered = PrimitiveArray::<UInt32Type>::from(
- compute::filter(&probe_indices, mask)?.data().clone(),
- );
- Ok((left_filtered, right_filtered))
+ let left_filtered = compute::filter(&build_indices, mask)?;
+ let right_filtered = compute::filter(&probe_indices, mask)?;
+ Ok((
+ downcast_array(left_filtered.as_ref()),
+ downcast_array(right_filtered.as_ref()),
+ ))
}
/// Returns a new [RecordBatch] by combining the `left` and `right` according
to `indices`.
diff --git a/datafusion/physical-expr/src/aggregate/count.rs
b/datafusion/physical-expr/src/aggregate/count.rs
index 38b193ebf..dc77b794a 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -137,13 +137,13 @@ impl Accumulator for CountAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array = &values[0];
- self.count += (array.len() - array.data().null_count()) as i64;
+ self.count += (array.len() - array.null_count()) as i64;
Ok(())
}
fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let array = &values[0];
- self.count -= (array.len() - array.data().null_count()) as i64;
+ self.count -= (array.len() - array.null_count()) as i64;
Ok(())
}
@@ -183,7 +183,7 @@ impl RowAccumulator for CountRowAccumulator {
accessor: &mut RowAccessor,
) -> Result<()> {
let array = &values[0];
- let delta = (array.len() - array.data().null_count()) as u64;
+ let delta = (array.len() - array.null_count()) as u64;
accessor.add_u64(self.state_index, delta);
Ok(())
}
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs
b/datafusion/physical-expr/src/aggregate/sum.rs
index 0a21de22d..a815a33c8 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -236,7 +236,7 @@ impl Accumulator for SumAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &values[0];
- self.count += (values.len() - values.data().null_count()) as u64;
+ self.count += (values.len() - values.null_count()) as u64;
let delta = sum_batch(values, &self.sum.get_datatype())?;
self.sum = self.sum.add(&delta)?;
Ok(())
@@ -244,7 +244,7 @@ impl Accumulator for SumAccumulator {
fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
let values = &values[0];
- self.count -= (values.len() - values.data().null_count()) as u64;
+ self.count -= (values.len() - values.null_count()) as u64;
let delta = sum_batch(values, &self.sum.get_datatype())?;
self.sum = self.sum.sub(&delta)?;
Ok(())
diff --git a/datafusion/physical-expr/src/expressions/case.rs
b/datafusion/physical-expr/src/expressions/case.rs
index 3dff4ae97..d64947445 100644
--- a/datafusion/physical-expr/src/expressions/case.rs
+++ b/datafusion/physical-expr/src/expressions/case.rs
@@ -716,10 +716,10 @@ mod tests {
//let valid_array = vec![true, false, false, true, false, tru
let null_buffer = Buffer::from([0b00101001u8]);
- let load4 = ArrayDataBuilder::new(load4.data_type().clone())
- .len(load4.len())
+ let load4 = load4
+ .into_data()
+ .into_builder()
.null_bit_buffer(Some(null_buffer))
- .buffers(load4.data().buffers().to_vec())
.build()
.unwrap();
let load4: Float64Array = load4.into();
diff --git a/datafusion/physical-expr/src/expressions/in_list.rs
b/datafusion/physical-expr/src/expressions/in_list.rs
index 3a5a25ff6..e37d5cd74 100644
--- a/datafusion/physical-expr/src/expressions/in_list.rs
+++ b/datafusion/physical-expr/src/expressions/in_list.rs
@@ -86,7 +86,7 @@ where
{
fn new(array: &T, hash_set: ArrayHashSet) -> Self {
Self {
- array: T::from(array.data().clone()),
+ array: downcast_array(array),
hash_set,
}
}
@@ -103,15 +103,14 @@ where
v => {
let values_contains = self.contains(v.values().as_ref(),
negated)?;
let result = take(&values_contains, v.keys(), None)?;
- return Ok(BooleanArray::from(result.data().clone()))
+ return Ok(downcast_array(result.as_ref()))
}
_ => {}
}
let v = v.as_any().downcast_ref::<T>().unwrap();
- let in_data = self.array.data();
let in_array = &self.array;
- let has_nulls = in_data.null_count() != 0;
+ let has_nulls = in_array.null_count() != 0;
Ok(ArrayIter::new(v)
.map(|v| {