alamb commented on code in PR #14671:
URL: https://github.com/apache/datafusion/pull/14671#discussion_r1963528527


##########
datafusion-cli/src/functions.rs:
##########
@@ -28,10 +28,10 @@ use arrow::record_batch::RecordBatch;
 use arrow::util::pretty::pretty_format_batches;
 use datafusion::catalog::{Session, TableFunctionImpl};
 use datafusion::common::{plan_err, Column};
+use datafusion::datasource::memory::MemorySourceConfig;

Review Comment:
   
![giphy](https://github.com/user-attachments/assets/b8ab0fec-8c3e-46c2-8524-c30d46e5fa4d)
   



##########
datafusion/core/Cargo.toml:
##########
@@ -222,3 +222,7 @@ required-features = ["nested_expressions"]
 [[bench]]
 harness = false
 name = "dataframe"
+
+[[bench]]
+harness = false

Review Comment:
   👍 



##########
datafusion/datasource/src/memory.rs:
##########
@@ -0,0 +1,926 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for reading in-memory batches of data
+
+use std::any::Any;
+use std::fmt;
+use std::sync::Arc;
+
+use crate::source::{DataSource, DataSourceExec};
+use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion_physical_plan::memory::MemoryStream;
+use datafusion_physical_plan::projection::{
+    all_alias_free_columns, new_projections_for_columns, ProjectionExec,
+};
+use datafusion_physical_plan::{
+    common, ColumnarValue, DisplayAs, DisplayFormatType, ExecutionPlan, 
Partitioning,
+    PhysicalExpr, PlanProperties, SendableRecordBatchStream, Statistics,
+};
+
+use arrow::array::{RecordBatch, RecordBatchOptions};
+use arrow::datatypes::{Schema, SchemaRef};
+use datafusion_common::{
+    internal_err, plan_err, project_schema, Constraints, Result, ScalarValue,
+};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::equivalence::ProjectionMapping;
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr::utils::collect_columns;
+use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
+
+/// Execution plan for reading in-memory batches of data
+#[derive(Clone)]
+#[deprecated(
+    since = "46.0.0",
+    note = "use MemorySourceConfig and DataSourceExec instead"
+)]
+pub struct MemoryExec {
+    inner: DataSourceExec,
+    /// The partitions to query
+    partitions: Vec<Vec<RecordBatch>>,
+    /// Optional projection
+    projection: Option<Vec<usize>>,
+    // Sort information: one or more equivalent orderings
+    sort_information: Vec<LexOrdering>,
+    /// if partition sizes should be displayed
+    show_sizes: bool,
+}
+
+#[allow(unused, deprecated)]
+impl fmt::Debug for MemoryExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        self.inner.fmt_as(DisplayFormatType::Default, f)
+    }
+}
+
+#[allow(unused, deprecated)]
+impl DisplayAs for MemoryExec {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> 
fmt::Result {
+        self.inner.fmt_as(t, f)
+    }
+}
+
+#[allow(unused, deprecated)]
+impl ExecutionPlan for MemoryExec {
+    fn name(&self) -> &'static str {
+        "MemoryExec"
+    }
+
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        self.inner.properties()
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        // This is a leaf node and has no children
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // MemoryExec has no children
+        if children.is_empty() {
+            Ok(self)
+        } else {
+            internal_err!("Children cannot be replaced in {self:?}")
+        }
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        self.inner.execute(partition, context)
+    }
+
+    /// We recompute the statistics dynamically from the arrow metadata as it 
is pretty cheap to do so
+    fn statistics(&self) -> Result<Statistics> {
+        self.inner.statistics()
+    }
+
+    fn try_swapping_with_projection(
+        &self,
+        projection: &ProjectionExec,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        self.inner.try_swapping_with_projection(projection)
+    }
+}
+
+#[allow(unused, deprecated)]
+impl MemoryExec {
+    /// Create a new execution plan for reading in-memory record batches
+    /// The provided `schema` should not have the projection applied.
+    pub fn try_new(
+        partitions: &[Vec<RecordBatch>],
+        schema: SchemaRef,
+        projection: Option<Vec<usize>>,
+    ) -> Result<Self> {
+        let source = MemorySourceConfig::try_new(partitions, schema, 
projection.clone())?;
+        let data_source = DataSourceExec::new(Arc::new(source));
+        Ok(Self {
+            inner: data_source,
+            partitions: partitions.to_vec(),
+            projection,
+            sort_information: vec![],
+            show_sizes: true,
+        })
+    }
+
+    /// Create a new execution plan from a list of constant values 
(`ValuesExec`)
+    pub fn try_new_as_values(
+        schema: SchemaRef,
+        data: Vec<Vec<Arc<dyn PhysicalExpr>>>,
+    ) -> Result<Self> {
+        if data.is_empty() {
+            return plan_err!("Values list cannot be empty");
+        }
+
+        let n_row = data.len();
+        let n_col = schema.fields().len();
+
+        // We have this single row batch as a placeholder to satisfy 
evaluation argument
+        // and generate a single output row
+        let placeholder_schema = Arc::new(Schema::empty());
+        let placeholder_batch = RecordBatch::try_new_with_options(
+            Arc::clone(&placeholder_schema),
+            vec![],
+            &RecordBatchOptions::new().with_row_count(Some(1)),
+        )?;
+
+        // Evaluate each column
+        let arrays = (0..n_col)
+            .map(|j| {
+                (0..n_row)
+                    .map(|i| {
+                        let expr = &data[i][j];
+                        let result = expr.evaluate(&placeholder_batch)?;
+
+                        match result {
+                            ColumnarValue::Scalar(scalar) => Ok(scalar),
+                            ColumnarValue::Array(array) if array.len() == 1 => 
{
+                                ScalarValue::try_from_array(&array, 0)
+                            }
+                            ColumnarValue::Array(_) => {
+                                plan_err!("Cannot have array values in a 
values list")
+                            }
+                        }
+                    })
+                    .collect::<Result<Vec<_>>>()
+                    .and_then(ScalarValue::iter_to_array)
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        let batch = RecordBatch::try_new_with_options(
+            Arc::clone(&schema),
+            arrays,
+            &RecordBatchOptions::new().with_row_count(Some(n_row)),
+        )?;
+
+        let partitions = vec![batch];
+        Self::try_new_from_batches(Arc::clone(&schema), partitions)
+    }
+
+    /// Create a new plan using the provided schema and batches.
+    ///
+    /// Errors if any of the batches don't match the provided schema, or if no
+    /// batches are provided.
+    pub fn try_new_from_batches(
+        schema: SchemaRef,
+        batches: Vec<RecordBatch>,
+    ) -> Result<Self> {
+        if batches.is_empty() {
+            return plan_err!("Values list cannot be empty");
+        }
+
+        for batch in &batches {
+            let batch_schema = batch.schema();
+            if batch_schema != schema {
+                return plan_err!(
+                    "Batch has invalid schema. Expected: {}, got: {}",
+                    schema,
+                    batch_schema
+                );
+            }
+        }
+
+        let partitions = vec![batches];
+        let source = MemorySourceConfig {
+            partitions: partitions.clone(),
+            schema: Arc::clone(&schema),
+            projected_schema: Arc::clone(&schema),
+            projection: None,
+            sort_information: vec![],
+            show_sizes: true,
+            fetch: None,
+        };
+        let data_source = DataSourceExec::new(Arc::new(source));
+        Ok(Self {
+            inner: data_source,
+            partitions,
+            projection: None,
+            sort_information: vec![],
+            show_sizes: true,
+        })
+    }
+
+    fn memory_source_config(&self) -> MemorySourceConfig {
+        self.inner
+            .source()
+            .as_any()
+            .downcast_ref::<MemorySourceConfig>()
+            .unwrap()
+            .clone()
+    }
+
+    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
+        self.inner = self.inner.with_constraints(constraints);
+        self
+    }
+
+    /// Set `show_sizes` to determine whether to display partition sizes
+    pub fn with_show_sizes(mut self, show_sizes: bool) -> Self {
+        let mut memory_source = self.memory_source_config();
+        memory_source.show_sizes = show_sizes;
+        self.show_sizes = show_sizes;
+        self.inner = DataSourceExec::new(Arc::new(memory_source));
+        self
+    }
+
+    /// Ref to constraints
+    pub fn constraints(&self) -> &Constraints {
+        self.properties().equivalence_properties().constraints()
+    }
+
+    /// Ref to partitions
+    pub fn partitions(&self) -> &[Vec<RecordBatch>] {
+        &self.partitions
+    }
+
+    /// Ref to projection
+    pub fn projection(&self) -> &Option<Vec<usize>> {
+        &self.projection
+    }
+
+    /// Show sizes
+    pub fn show_sizes(&self) -> bool {
+        self.show_sizes
+    }
+
+    /// Ref to sort information
+    pub fn sort_information(&self) -> &[LexOrdering] {
+        &self.sort_information
+    }
+
+    /// A memory table can be ordered by multiple expressions simultaneously.
+    /// [`EquivalenceProperties`] keeps track of expressions that describe the
+    /// global ordering of the schema. These columns are not necessarily same; 
e.g.
+    /// ```text
+    /// ┌-------┐
+    /// | a | b |
+    /// |---|---|
+    /// | 1 | 9 |
+    /// | 2 | 8 |
+    /// | 3 | 7 |
+    /// | 5 | 5 |
+    /// └---┴---┘
+    /// ```
+    /// where both `a ASC` and `b DESC` can describe the table ordering. With
+    /// [`EquivalenceProperties`], we can keep track of these equivalences
+    /// and treat `a ASC` and `b DESC` as the same ordering requirement.
+    ///
+    /// Note that if there is an internal projection, that projection will be
+    /// also applied to the given `sort_information`.
+    pub fn try_with_sort_information(
+        mut self,
+        sort_information: Vec<LexOrdering>,
+    ) -> Result<Self> {
+        self.sort_information = sort_information.clone();
+        let mut memory_source = self.memory_source_config();
+        memory_source = 
memory_source.try_with_sort_information(sort_information)?;
+        self.inner = DataSourceExec::new(Arc::new(memory_source));
+        Ok(self)
+    }
+
+    /// Arc clone of ref to original schema
+    pub fn original_schema(&self) -> SchemaRef {
+        Arc::clone(&self.inner.schema())
+    }
+
+    /// This function creates the cache object that stores the plan properties 
such as schema, equivalence properties, ordering, partitioning, etc.
+    fn compute_properties(
+        schema: SchemaRef,
+        orderings: &[LexOrdering],
+        constraints: Constraints,
+        partitions: &[Vec<RecordBatch>],
+    ) -> PlanProperties {
+        PlanProperties::new(
+            EquivalenceProperties::new_with_orderings(schema, orderings)
+                .with_constraints(constraints),
+            Partitioning::UnknownPartitioning(partitions.len()),
+            EmissionType::Incremental,
+            Boundedness::Bounded,
+        )
+    }
+}
+
+/// Data source configuration for reading in-memory batches of data
+#[derive(Clone)]
+pub struct MemorySourceConfig {

Review Comment:
   I am just confirming that the plan is that datasource will have 
MemorySourceConfig / MemorySource and that we will move the other format 
specific things (like parquet) to their own crates like 
`datafusion-datasource-parquet` ?



##########
datafusion/physical-plan/src/joins/test_utils.rs:
##########
@@ -23,9 +23,9 @@ use crate::joins::utils::{JoinFilter, JoinOn};
 use crate::joins::{
     HashJoinExec, PartitionMode, StreamJoinPartitionMode, 
SymmetricHashJoinExec,
 };
-use crate::memory::MemorySourceConfig;
 use crate::repartition::RepartitionExec;
-use crate::source::DataSourceExec;
+use crate::test::MockMemorySourceConfig;
+// use crate::test::MockMemorySourceConfig;

Review Comment:
   ```suggestion
   ```



##########
datafusion/physical-plan/src/test.rs:
##########
@@ -17,27 +17,347 @@
 
 //! Utilities for testing datafusion-physical-plan
 
+use std::any::Any;
 use std::collections::HashMap;
+use std::fmt;
+use std::fmt::{Debug, Formatter};
 use std::pin::Pin;
 use std::sync::Arc;
+use std::task::Context;
+
+use crate::common;
+use crate::execution_plan::{Boundedness, EmissionType};
+use crate::memory::MemoryStream;
+use crate::metrics::MetricsSet;
+use crate::stream::RecordBatchStreamAdapter;
+use crate::streaming::PartitionStream;
+use crate::ExecutionPlan;
+use crate::{DisplayAs, DisplayFormatType, PlanProperties};
 
 use arrow::array::{Array, ArrayRef, Int32Array, RecordBatch};
 use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use datafusion_common::{
+    config::ConfigOptions, internal_err, project_schema, Result, Statistics,
+};
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
-use futures::{Future, FutureExt};
+use datafusion_physical_expr::{
+    equivalence::ProjectionMapping, expressions::Column, 
utils::collect_columns,
+    EquivalenceProperties, LexOrdering, Partitioning,
+};
 
-use crate::memory::MemorySourceConfig;
-use crate::source::DataSourceExec;
-use crate::stream::RecordBatchStreamAdapter;
-use crate::streaming::PartitionStream;
-use crate::ExecutionPlan;
+use futures::{Future, FutureExt};
 
 pub mod exec;
 
+#[derive(Clone, Debug)]
+pub struct MockMemorySourceConfig {

Review Comment:
   Some suggestions:
   1. Document that this implements an in memory `DataSource` for testing 
purposes (and maybe we can add a link to the real memory source) and that it 
duplicates much of the `MemorySourceConfig` to avoid a dependency between 
physical-plan and datasource
   2. Rename it to something like `TestMemoryExec`  as this is not a config but 
rather something that `impl ExecutionPlan` and is used as a source of data in 
the tests 



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -1030,8 +1030,8 @@ impl EmbeddedProjection for NestedLoopJoinExec {
 #[cfg(test)]
 pub(crate) mod tests {
     use super::*;
-    use crate::memory::MemorySourceConfig;
-    use crate::source::DataSourceExec;
+    use crate::test::MockMemorySourceConfig;
+    // use crate::test::MockMemorySourceConfig;

Review Comment:
   left over?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to