rdettai commented on a change in pull request #1141:
URL: https://github.com/apache/arrow-datafusion/pull/1141#discussion_r739011378
##########
File path: datafusion/src/physical_plan/file_format/mod.rs
##########
@@ -50,3 +165,260 @@ impl<'a> Display for FileGroupsDisplay<'a> {
write!(f, "[{}]", parts.join(", "))
}
}
+
+/// A helper that projects partition columns into the file record batches
+struct PartitionColumnProjector {
+ /// An Arrow buffer initialized to zeros that represents the key array of
all partition
+ /// columns (partition columns are materialized by dictionary arrays with
only one
+ /// value in the dictionary, thus all the keys are equal to zero).
+ key_buffer_cache: Option<Buffer>,
+ /// Mapping between the indexes in the list of partition columns and the
target
+ /// schema. Sorted by index in the target schema so that we can iterate on
it to
+ /// insert the partition columns in the target record batch.
+ projected_partition_indexes: Vec<(usize, usize)>,
+ /// The schema of the table once the projection was applied.
+ projected_schema: SchemaRef,
+}
+
+impl PartitionColumnProjector {
+ // Create a projector to insert the partitioning columns into batches read
from files
+ // - projected_schema: the target schema with both file and partitioning
columns
+ // - table_partition_cols: all the partitioning column names
+ fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) ->
Self {
+ let mut idx_map = HashMap::new();
+ for (partition_idx, partition_name) in
table_partition_cols.iter().enumerate() {
+ if let Ok(schema_idx) = projected_schema.index_of(partition_name) {
+ idx_map.insert(partition_idx, schema_idx);
+ }
+ }
+
+ let mut projected_partition_indexes: Vec<_> =
idx_map.into_iter().collect();
+ projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b));
+
+ Self {
+ projected_partition_indexes,
+ key_buffer_cache: None,
+ projected_schema,
+ }
+ }
+
+ // Transform the batch read from the fileby inserting the partitioning
columns
+ // to the right positions as deduced from `projected_schema`
+ // - file_batch: batch read from the file, with internal projection applied
+ // - partition_values: the list of partition values, one for each
partition column
+ fn project(
+ &mut self,
+ file_batch: RecordBatch,
+ partition_values: &[ScalarValue],
+ ) -> ArrowResult<RecordBatch> {
+ let expected_cols =
+ self.projected_schema.fields().len() -
self.projected_partition_indexes.len();
+
+ if file_batch.columns().len() != expected_cols {
+ return Err(ArrowError::SchemaError(format!(
+ "Unexpected batch schema from file, expected {} cols but got
{}",
+ expected_cols,
+ file_batch.columns().len()
+ )));
+ }
+
+ let mut cols = file_batch.columns().to_vec();
+ for &(pidx, sidx) in &self.projected_partition_indexes {
+ cols.insert(
+ sidx,
+ create_dict_array(
+ &mut self.key_buffer_cache,
+ &partition_values[pidx],
+ file_batch.num_rows(),
+ ),
+ )
+ }
+ RecordBatch::try_new(Arc::clone(&self.projected_schema), cols)
+ }
+}
+
+fn create_dict_array(
+ key_buffer_cache: &mut Option<Buffer>,
+ val: &ScalarValue,
+ len: usize,
+) -> ArrayRef {
+ // build value dictionary
+ let dict_vals = val.to_array();
+
+ // build keys array
+ let sliced_key_buffer = match key_buffer_cache {
+ Some(buf) if buf.len() >= len => buf.slice(buf.len() - len),
+ _ => {
+ let mut key_buffer_builder = UInt8BufferBuilder::new(len);
+ key_buffer_builder.advance(len); // keys are all 0
+ key_buffer_cache.insert(key_buffer_builder.finish()).clone()
+ }
+ };
+
+ // create data type
+ let data_type =
+ DataType::Dictionary(Box::new(DataType::UInt8),
Box::new(val.get_datatype()));
+
+ debug_assert_eq!(data_type, *DEFAULT_PARTITION_COLUMN_DATATYPE);
+
+ // assemble pieces together
+ let mut builder = ArrayData::builder(data_type)
+ .len(len)
+ .add_buffer(sliced_key_buffer);
+ builder = builder.add_child_data(dict_vals.data().clone());
+ Arc::new(DictionaryArray::<UInt8Type>::from(builder.build().unwrap()))
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::test::{
+ aggr_test_schema, build_table_i32, columns,
object_store::TestObjectStore,
+ };
+
+ use super::*;
+
+ #[test]
+ fn physical_plan_config_no_projection() {
+ let file_schema = aggr_test_schema();
+ let conf = config_for_projection(
+ Arc::clone(&file_schema),
+ None,
+ Statistics::default(),
+ vec!["date".to_owned()],
+ );
+
+ let (proj_schema, proj_statistics) = conf.project();
+ assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
+ assert_eq!(
+ proj_schema.field(file_schema.fields().len()).name(),
+ "date",
+ "partition columns are the last columns"
+ );
+ assert_eq!(
+ proj_statistics
+ .column_statistics
+ .expect("projection creates column statistics")
+ .len(),
+ file_schema.fields().len() + 1
+ );
+ // TODO implement tests for partition column statistics once
implemented
+
+ let col_names = conf.projected_file_column_names();
+ assert_eq!(col_names, None);
+
+ let col_indices = conf.file_column_projection_indices();
+ assert_eq!(col_indices, None);
+ }
+
+ #[test]
+ fn physical_plan_config_with_projection() {
+ let file_schema = aggr_test_schema();
+ let conf = config_for_projection(
+ Arc::clone(&file_schema),
+ Some(vec![file_schema.fields().len(), 0]),
+ Statistics {
+ num_rows: Some(10),
+ // assign the column index to distinct_count to help assert
+ // the source statistic after the projection
+ column_statistics: Some(
+ (0..file_schema.fields().len())
+ .map(|i| ColumnStatistics {
+ distinct_count: Some(i),
+ ..Default::default()
+ })
+ .collect(),
+ ),
+ ..Default::default()
+ },
+ vec!["date".to_owned()],
+ );
+
+ let (proj_schema, proj_statistics) = conf.project();
+ assert_eq!(
+ columns(&proj_schema),
+ vec!["date".to_owned(), "c1".to_owned()]
+ );
+ let proj_stat_cols = proj_statistics
+ .column_statistics
+ .expect("projection creates column statistics");
+ assert_eq!(proj_stat_cols.len(), 2);
+ // TODO implement tests for proj_stat_cols[0] once partition column
+ // statistics are implemented
+ assert_eq!(proj_stat_cols[1].distinct_count, Some(0));
+
+ let col_names = conf.projected_file_column_names();
+ assert_eq!(col_names, Some(vec!["c1".to_owned()]));
+
+ let col_indices = conf.file_column_projection_indices();
+ assert_eq!(col_indices, Some(vec![0]));
+ }
+
+ #[test]
+ fn partition_column_projector() {
+ let file_batch = build_table_i32(
+ ("a", &vec![0, 1, 2]),
+ ("b", &vec![-2, -1, 0]),
+ ("c", &vec![10, 11, 12]),
+ );
+ let partition_cols =
+ vec!["year".to_owned(), "month".to_owned(), "day".to_owned()];
+ // create a projected schema
+ let conf = config_for_projection(
+ file_batch.schema(),
+ // keep all cols from file and 2 from partitioning
+ Some(vec![
+ 0,
+ 1,
+ 2,
+ file_batch.schema().fields().len(),
+ file_batch.schema().fields().len() + 2,
+ ]),
+ Statistics::default(),
+ partition_cols.clone(),
+ );
+ let (proj_schema, _) = conf.project();
+ // created a projector for that projected schema
+ let mut proj = PartitionColumnProjector::new(proj_schema,
&partition_cols);
+ let projected_batch = proj
+ .project(
+ // file_batch is ok here because we kept all the file cols in
the projection
+ file_batch,
+ &[
+ ScalarValue::Utf8(Some("2021".to_owned())),
+ ScalarValue::Utf8(Some("10".to_owned())),
+ ScalarValue::Utf8(Some("26".to_owned())),
+ ],
+ )
+ .expect("Projection of partition columns into record batch
failed");
+
+ let expected = vec![
+ "+---+----+----+------+-----+",
+ "| a | b | c | year | day |",
+ "+---+----+----+------+-----+",
+ "| 0 | -2 | 10 | 2021 | 26 |",
+ "| 1 | -1 | 11 | 2021 | 26 |",
+ "| 2 | 0 | 12 | 2021 | 26 |",
+ "+---+----+----+------+-----+",
+ ];
+ crate::assert_batches_eq!(expected, &[projected_batch]);
Review comment:
doing it for both larger and smaller batches
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]