logan-keede commented on code in PR #14671: URL: https://github.com/apache/datafusion/pull/14671#discussion_r1963794088
########## datafusion/datasource/src/memory.rs: ########## @@ -0,0 +1,926 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Execution plan for reading in-memory batches of data + +use std::any::Any; +use std::fmt; +use std::sync::Arc; + +use crate::source::{DataSource, DataSourceExec}; +use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion_physical_plan::memory::MemoryStream; +use datafusion_physical_plan::projection::{ + all_alias_free_columns, new_projections_for_columns, ProjectionExec, +}; +use datafusion_physical_plan::{ + common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PhysicalExpr, PlanProperties, SendableRecordBatchStream, Statistics, +}; + +use arrow::array::{RecordBatch, RecordBatchOptions}; +use arrow::datatypes::{Schema, SchemaRef}; +use datafusion_common::{ + internal_err, plan_err, project_schema, Constraints, Result, ScalarValue, +}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::equivalence::ProjectionMapping; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::utils::collect_columns; +use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; + +/// Execution plan for reading in-memory batches of data +#[derive(Clone)] +#[deprecated( + since = "46.0.0", + note = "use MemorySourceConfig and DataSourceExec instead" +)] +pub struct MemoryExec { + inner: DataSourceExec, + /// The partitions to query + partitions: Vec<Vec<RecordBatch>>, + /// Optional projection + projection: Option<Vec<usize>>, + // Sort information: one or more equivalent orderings + sort_information: Vec<LexOrdering>, + /// if partition sizes should be displayed + show_sizes: bool, +} + +#[allow(unused, deprecated)] +impl fmt::Debug for MemoryExec { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.inner.fmt_as(DisplayFormatType::Default, f) + } +} + +#[allow(unused, deprecated)] +impl DisplayAs for MemoryExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + self.inner.fmt_as(t, f) + } +} + +#[allow(unused, deprecated)] +impl ExecutionPlan for MemoryExec { + fn name(&self) -> &'static str { + "MemoryExec" + } + + /// Return a reference to Any that can be used for downcasting + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + self.inner.properties() + } + + fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> { + // This is a leaf node and has no children + vec![] + } + + fn with_new_children( + self: Arc<Self>, + children: Vec<Arc<dyn ExecutionPlan>>, + ) -> Result<Arc<dyn ExecutionPlan>> { + // MemoryExec has no children + if children.is_empty() { + Ok(self) + } else { + internal_err!("Children cannot be replaced in {self:?}") + } + } + + fn execute( + &self, + partition: usize, + context: Arc<TaskContext>, + ) -> Result<SendableRecordBatchStream> { + self.inner.execute(partition, context) + } + + /// We recompute the statistics dynamically from the arrow metadata as it is pretty cheap to do so + fn statistics(&self) -> Result<Statistics> { + self.inner.statistics() + } + + fn try_swapping_with_projection( + &self, + projection: &ProjectionExec, + ) -> Result<Option<Arc<dyn ExecutionPlan>>> { + self.inner.try_swapping_with_projection(projection) + } +} + +#[allow(unused, deprecated)] +impl MemoryExec { + /// Create a new execution plan for reading in-memory record batches + /// The provided `schema` should not have the projection applied. + pub fn try_new( + partitions: &[Vec<RecordBatch>], + schema: SchemaRef, + projection: Option<Vec<usize>>, + ) -> Result<Self> { + let source = MemorySourceConfig::try_new(partitions, schema, projection.clone())?; + let data_source = DataSourceExec::new(Arc::new(source)); + Ok(Self { + inner: data_source, + partitions: partitions.to_vec(), + projection, + sort_information: vec![], + show_sizes: true, + }) + } + + /// Create a new execution plan from a list of constant values (`ValuesExec`) + pub fn try_new_as_values( + schema: SchemaRef, + data: Vec<Vec<Arc<dyn PhysicalExpr>>>, + ) -> Result<Self> { + if data.is_empty() { + return plan_err!("Values list cannot be empty"); + } + + let n_row = data.len(); + let n_col = schema.fields().len(); + + // We have this single row batch as a placeholder to satisfy evaluation argument + // and generate a single output row + let placeholder_schema = Arc::new(Schema::empty()); + let placeholder_batch = RecordBatch::try_new_with_options( + Arc::clone(&placeholder_schema), + vec![], + &RecordBatchOptions::new().with_row_count(Some(1)), + )?; + + // Evaluate each column + let arrays = (0..n_col) + .map(|j| { + (0..n_row) + .map(|i| { + let expr = &data[i][j]; + let result = expr.evaluate(&placeholder_batch)?; + + match result { + ColumnarValue::Scalar(scalar) => Ok(scalar), + ColumnarValue::Array(array) if array.len() == 1 => { + ScalarValue::try_from_array(&array, 0) + } + ColumnarValue::Array(_) => { + plan_err!("Cannot have array values in a values list") + } + } + }) + .collect::<Result<Vec<_>>>() + .and_then(ScalarValue::iter_to_array) + }) + .collect::<Result<Vec<_>>>()?; + + let batch = RecordBatch::try_new_with_options( + Arc::clone(&schema), + arrays, + &RecordBatchOptions::new().with_row_count(Some(n_row)), + )?; + + let partitions = vec![batch]; + Self::try_new_from_batches(Arc::clone(&schema), partitions) + } + + /// Create a new plan using the provided schema and batches. + /// + /// Errors if any of the batches don't match the provided schema, or if no + /// batches are provided. + pub fn try_new_from_batches( + schema: SchemaRef, + batches: Vec<RecordBatch>, + ) -> Result<Self> { + if batches.is_empty() { + return plan_err!("Values list cannot be empty"); + } + + for batch in &batches { + let batch_schema = batch.schema(); + if batch_schema != schema { + return plan_err!( + "Batch has invalid schema. Expected: {}, got: {}", + schema, + batch_schema + ); + } + } + + let partitions = vec![batches]; + let source = MemorySourceConfig { + partitions: partitions.clone(), + schema: Arc::clone(&schema), + projected_schema: Arc::clone(&schema), + projection: None, + sort_information: vec![], + show_sizes: true, + fetch: None, + }; + let data_source = DataSourceExec::new(Arc::new(source)); + Ok(Self { + inner: data_source, + partitions, + projection: None, + sort_information: vec![], + show_sizes: true, + }) + } + + fn memory_source_config(&self) -> MemorySourceConfig { + self.inner + .source() + .as_any() + .downcast_ref::<MemorySourceConfig>() + .unwrap() + .clone() + } + + pub fn with_constraints(mut self, constraints: Constraints) -> Self { + self.inner = self.inner.with_constraints(constraints); + self + } + + /// Set `show_sizes` to determine whether to display partition sizes + pub fn with_show_sizes(mut self, show_sizes: bool) -> Self { + let mut memory_source = self.memory_source_config(); + memory_source.show_sizes = show_sizes; + self.show_sizes = show_sizes; + self.inner = DataSourceExec::new(Arc::new(memory_source)); + self + } + + /// Ref to constraints + pub fn constraints(&self) -> &Constraints { + self.properties().equivalence_properties().constraints() + } + + /// Ref to partitions + pub fn partitions(&self) -> &[Vec<RecordBatch>] { + &self.partitions + } + + /// Ref to projection + pub fn projection(&self) -> &Option<Vec<usize>> { + &self.projection + } + + /// Show sizes + pub fn show_sizes(&self) -> bool { + self.show_sizes + } + + /// Ref to sort information + pub fn sort_information(&self) -> &[LexOrdering] { + &self.sort_information + } + + /// A memory table can be ordered by multiple expressions simultaneously. + /// [`EquivalenceProperties`] keeps track of expressions that describe the + /// global ordering of the schema. These columns are not necessarily same; e.g. + /// ```text + /// ┌-------┐ + /// | a | b | + /// |---|---| + /// | 1 | 9 | + /// | 2 | 8 | + /// | 3 | 7 | + /// | 5 | 5 | + /// └---┴---┘ + /// ``` + /// where both `a ASC` and `b DESC` can describe the table ordering. With + /// [`EquivalenceProperties`], we can keep track of these equivalences + /// and treat `a ASC` and `b DESC` as the same ordering requirement. + /// + /// Note that if there is an internal projection, that projection will be + /// also applied to the given `sort_information`. + pub fn try_with_sort_information( + mut self, + sort_information: Vec<LexOrdering>, + ) -> Result<Self> { + self.sort_information = sort_information.clone(); + let mut memory_source = self.memory_source_config(); + memory_source = memory_source.try_with_sort_information(sort_information)?; + self.inner = DataSourceExec::new(Arc::new(memory_source)); + Ok(self) + } + + /// Arc clone of ref to original schema + pub fn original_schema(&self) -> SchemaRef { + Arc::clone(&self.inner.schema()) + } + + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. + fn compute_properties( + schema: SchemaRef, + orderings: &[LexOrdering], + constraints: Constraints, + partitions: &[Vec<RecordBatch>], + ) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new_with_orderings(schema, orderings) + .with_constraints(constraints), + Partitioning::UnknownPartitioning(partitions.len()), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +/// Data source configuration for reading in-memory batches of data +#[derive(Clone)] +pub struct MemorySourceConfig { Review Comment: Yes, That's what I had in my mind so far. However, now that I think about it, I don't think format-specific crate(except parquet) will have more than 2(fileformat, Source+mod) files, If we are going to keep a separate crate for them, maybe we can keep a separate crate for `MemorySourceConfig`(1 file, Source + mod.rs ) too. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org