rspears74 commented on issue #8975:
URL: 
https://github.com/apache/arrow-datafusion/issues/8975#issuecomment-1915285777

   The two things that need to be changed are the parquet path being read (in 
`main`), and the group_by columns (in `agg_table`). Running this throws the 
same error as above.
   
   ```
   use datafusion::{
       arrow::{
           array::{Array, ArrayRef, AsArray, Float64Array, ListArray},
           buffer::OffsetBuffer,
           datatypes::{DataType, Field, Float64Type}
       }, common::DFSchema, dataframe::DataFrame, error::{
           DataFusionError,
           Result as DfResult,
       }, execution::{context::SessionContext, options::ParquetReadOptions}, 
logical_expr::{self as F, Expr, Volatility}, physical_plan::Accumulator, 
scalar::ScalarValue
   };
   use std::{io::Error, sync::Arc};
   use std::io::ErrorKind;
   use anyhow::Result;
   
   #[tokio::main]
   async fn main() -> Result<()> {
       // enter a parquet file path here that has numeric columns with null 
values
       // (if multiple, same schema, i.e. parquet files that are part of the 
same table)
       let paths = vec![
           "path/to/somefile.snappy.parquet"
       ];
       let cxt = SessionContext::new();
       map(paths, &cxt).await
   }
   
   pub async fn map(file_paths: Vec<&str>, df_context: &SessionContext) -> 
Result<()> {
       let input = read_files(df_context, file_paths).await?;
       let result = agg_table(input).await?;
       result.show().await?;
       Ok(())
   }
   
   async fn read_files(context: &SessionContext, paths: Vec<&str>) -> 
Result<DataFrame> {
       context.read_parquet(paths, ParquetReadOptions::default()).await
           .map_err(|e| e.into())
   }
   
   pub async fn agg_table(df: DataFrame) -> Result<DataFrame> {
       // aggregate by some columns
       let group_cols = vec!["SomeCol", "AnotherCol"];
       let cols = df_column_filter(
           &df.schema(),
           vec![DataType::Float64, DataType::Int32],
           &group_cols,
           None
       );
       let agg_exprs = build_agg(&cols);
       df.aggregate(cols_cased(group_cols), agg_exprs)
           .map_err(|e| e.into())
   }
   
   fn df_column_filter(df_schema: &DFSchema, data_types: Vec<DataType>, 
excluded_cols: &Vec<&str>, truncate: Option<usize>) -> Vec<String> {
       let mut col_fields = df_schema.fields().to_owned();
       match truncate {
           Some(limit) => col_fields.truncate(limit),
           None => ()
       }
       col_fields
           .into_iter()
           .map(|f| (f.name().to_owned(), f.data_type().to_owned()))
           .filter(|c| {
               data_types.contains(&c.1) &&
               !excluded_cols.contains(&c.0.as_str())
           })
           .map(|c| c.0).collect::<Vec<_>>()
   }
   
   fn build_agg(cols: &Vec<String>) -> Vec<Expr> {
       cols.iter().map(|c| {
           vec_agg(col_cased(c)).alias(c)
       }).collect()
   }
   
   fn col_cased(col_name: &str) -> Expr {
       F::col(format!(r#""{col_name}""#))
   }
   
   fn cols_cased(col_names: Vec<&str>) -> Vec<Expr> {
       col_names.iter().map(|n| col_cased(n)).collect()
   }
   
   #[derive(Debug)]
   pub struct VecAgg {
       values: Vec<f64>
   }
   
   impl VecAgg {
       // how the struct is initialized
       pub fn new() -> Self {
           VecAgg { values: vec![] }
       }
   }
   
   impl Accumulator for VecAgg {
       fn state(&mut self) -> DfResult<Vec<ScalarValue>> {
           Ok(vec![
               to_scalarvalue_list(Float64Array::from(self.values.clone()), 
self.values.len(), DataType::Float64),
           ])
       }
   
       fn evaluate(&mut self) -> DfResult<ScalarValue> {
           Ok(to_scalarvalue_list(Float64Array::from(self.values.clone()), 
self.values.len(), DataType::Float64))
       }
   
       fn update_batch(&mut self, values: &[ArrayRef]) -> DfResult<()> {
           if values.is_empty() {
               return Ok(());
           }
           let arr = &values[0];
           let mut float_values: Vec<f64> = (0..arr.len()).map(|index| {
               let v = ScalarValue::try_from_array(arr, index)?;
   
               if let ScalarValue::Float64(Some(value)) = v {
                   Ok(value)
               } else {
                   
Err(DataFusionError::IoError(Error::new(ErrorKind::InvalidData, "Null")))
               }
           }).flatten().collect();
           if self.values.len() < 100 {
               self.values.append(&mut float_values);
           }
           Ok(())
       }
   
       fn merge_batch(&mut self, states: &[ArrayRef]) -> DfResult<()> {
           if states.is_empty() {
               return Ok(());
           }
           let arr = &states[0];
           (0..arr.len()).try_for_each(|index| {
               let v = states
                   .iter()
                   .map(|array| ScalarValue::try_from_array(array, index))
                   .collect::<DfResult<Vec<_>>>()?;
               if let ScalarValue::List(arr) = &v[0] {
                   let mut vals = arr.values()
                       .as_primitive::<Float64Type>()
                       .values()
                       .to_vec();
                   if self.values.len() + vals.len() < 100 {
                       self.values.append(&mut vals);
                   }
               } else {
                   unreachable!("")
               }
               Ok(())
           })
       }
   
       fn size(&self) -> usize {
           std::mem::size_of_val(self)
       }
   }
   
   pub fn to_scalarvalue_list<T: Array + 'static>(array: T, len: usize, 
data_type: DataType) -> ScalarValue {
       ScalarValue::List(
           Arc::new(
               ListArray::new(
                   Field::new(
                       "item",
                       data_type,
                       true
                   ).into(),
                   OffsetBuffer::new_zeroed(len),
                   Arc::new(array),
                   None
               )
           )
       )
   }
   
   pub fn vec_agg(expr: Expr) -> Expr {
       let vec_agg = F::create_udaf(
           "vec_agg",
           vec![DataType::Float64],
           Arc::new(DataType::List(
               Field::new(
                   "item",
                   DataType::Float64,
                   true
               ).into()
           )),
           Volatility::Immutable,
           Arc::new(|_| Ok(Box::new(VecAgg::new()))),
           Arc::new(vec![DataType::List(
               Field::new(
                   "item",
                   DataType::Float64,
                   true
               ).into()
           )]),
       );
       vec_agg.call(vec![expr])
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to