alamb commented on code in PR #5520:
URL: https://github.com/apache/arrow-datafusion/pull/5520#discussion_r1131468017


##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -388,4 +465,115 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_insert_into_single_partition() -> Result<()> {
+        // Create a new session context
+        let session_ctx = SessionContext::new();
+        // Create a new schema with one field called "a" of type Int32
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int32, false)]));
+
+        // Create a new batch of data to insert into the table
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from_slice([1, 2, 3]))],
+        )?;
+        // Create a new table with one partition that contains the batch of 
data
+        let initial_table = Arc::new(MemTable::try_new(

Review Comment:
   I think you could make these tests significantly less verbose (and hence 
easier to read) if you created a function that did created a mem table from a 
schema and batches:
   
   ```
           let multi_partition_provider = create_mem_table(&schema, 
[batch.clone()]);
   ```



##########
datafusion/core/src/execution/context.rs:
##########
@@ -2714,6 +2728,46 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn sql_table_insert() -> Result<()> {

Review Comment:
   Would it be possible to write this as a sqllogictest 
https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/sqllogictests
 (after updating it to use the new codepath)?



##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -203,6 +203,24 @@ impl LogicalPlanBuilder {
         Self::scan_with_filters(table_name, table_source, projection, vec![])
     }
 
+    /// Convert a logical plan into a builder with a [DmlStatement]

Review Comment:
   ```suggestion
       /// Create a [DmlStatement] for inserting the contents of this builder 
into the named table
   ```



##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -143,22 +147,95 @@ impl TableProvider for MemTable {
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let batches = &self.batches.read().await;
         Ok(Arc::new(MemoryExec::try_new(
-            &self.batches.clone(),
+            batches,
             self.schema(),
             projection.cloned(),
         )?))
     }
+
+    /// Inserts the execution results of a given [LogicalPlan] into this 
[MemTable].
+    /// The `LogicalPlan` must have the same schema as this `MemTable`.
+    ///
+    /// # Arguments
+    ///
+    /// * `state` - The [SessionState] containing the context for executing 
the plan.
+    /// * `input` - The [LogicalPlan] to execute and insert.
+    ///
+    /// # Returns
+    ///
+    /// * A `Result` indicating success or failure.
+    async fn insert_into(&self, state: &SessionState, input: &LogicalPlan) -> 
Result<()> {
+        // Create a physical plan from the logical plan.
+        let plan = state.create_physical_plan(input).await?;
+
+        // Check that the schema of the plan matches the schema of this table.
+        if !plan.schema().eq(&self.schema) {
+            return Err(DataFusionError::Plan(
+                "Inserting query must have the same schema with the 
table.".to_string(),
+            ));
+        }
+
+        // Get the number of partitions in the plan and the table.
+        let plan_partition_count = 
plan.output_partitioning().partition_count();
+        let table_partition_count = self.batches.read().await.len();
+
+        // Adjust the plan as necessary to match the number of partitions in 
the table.
+        let plan: Arc<dyn ExecutionPlan> =
+            if plan_partition_count == table_partition_count {
+                plan
+            } else if table_partition_count == 1 {
+                // If the table has only one partition, coalesce the 
partitions in the plan.
+                Arc::new(CoalescePartitionsExec::new(plan))
+            } else {
+                // Otherwise, repartition the plan using a round-robin 
partitioning scheme.
+                Arc::new(RepartitionExec::try_new(
+                    plan,
+                    Partitioning::RoundRobinBatch(table_partition_count),
+                )?)
+            };
+
+        // Get the task context from the session state.
+        let task_ctx = state.task_ctx();
+
+        // Execute the plan and collect the results into batches.
+        let mut tasks = vec![];
+        for idx in 0..table_partition_count {

Review Comment:
   this will try and run all the partitions in parallel, which is probably fine 
(though maybe we want to limit it to the target_partitions 🤔 )
   
   I don't think this is important to fix now



##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -388,4 +465,115 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_insert_into_single_partition() -> Result<()> {
+        // Create a new session context
+        let session_ctx = SessionContext::new();
+        // Create a new schema with one field called "a" of type Int32
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int32, false)]));
+
+        // Create a new batch of data to insert into the table
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from_slice([1, 2, 3]))],
+        )?;
+        // Create a new table with one partition that contains the batch of 
data
+        let initial_table = Arc::new(MemTable::try_new(
+            schema.clone(),
+            vec![vec![batch.clone()]],
+        )?);
+        // Convert the table into a provider so that it can be used in a query
+        let provider = provider_as_source(Arc::new(MemTable::try_new(
+            schema.clone(),
+            vec![vec![batch.clone()]],
+        )?));
+        // Create a table scan logical plan to read from the table
+        let table_scan =
+            Arc::new(LogicalPlanBuilder::scan("source", provider, 
None)?.build()?);
+        // Insert the data from the provider into the table
+        initial_table
+            .insert_into(&session_ctx.state(), &table_scan)
+            .await?;
+        // Ensure that the table now contains two batches of data in the same 
partition
+        assert_eq!(initial_table.batches.read().await.get(0).unwrap().len(), 
2);
+
+        // Create a new provider with 2 partitions
+        let multi_partition_provider = 
provider_as_source(Arc::new(MemTable::try_new(
+            schema.clone(),
+            vec![vec![batch.clone()], vec![batch]],
+        )?));
+        // Create a new table scan logical plan to read from the provider
+        let table_scan = Arc::new(
+            LogicalPlanBuilder::scan("source", multi_partition_provider, None)?
+                .build()?,
+        );

Review Comment:
   Similarly I think you can refactor this into a function, it would make the 
tests easier to foloow;
   
   ```rust
   let table_scan = create_table_scan(multi_partition_provider);
   ```
   



##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -203,6 +203,24 @@ impl LogicalPlanBuilder {
         Self::scan_with_filters(table_name, table_source, projection, vec![])
     }
 
+    /// Convert a logical plan into a builder with a [DmlStatement]
+    pub fn insert_into(
+        input: LogicalPlan,
+        table_name: impl Into<String>,

Review Comment:
   I think this will (logically) conflict with changes @Jefffrey  is making in 
https://github.com/apache/arrow-datafusion/pull/5343



##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -143,22 +147,95 @@ impl TableProvider for MemTable {
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let batches = &self.batches.read().await;
         Ok(Arc::new(MemoryExec::try_new(
-            &self.batches.clone(),
+            batches,
             self.schema(),
             projection.cloned(),
         )?))
     }
+
+    /// Inserts the execution results of a given [LogicalPlan] into this 
[MemTable].
+    /// The `LogicalPlan` must have the same schema as this `MemTable`.
+    ///
+    /// # Arguments
+    ///
+    /// * `state` - The [SessionState] containing the context for executing 
the plan.
+    /// * `input` - The [LogicalPlan] to execute and insert.
+    ///
+    /// # Returns
+    ///
+    /// * A `Result` indicating success or failure.
+    async fn insert_into(&self, state: &SessionState, input: &LogicalPlan) -> 
Result<()> {
+        // Create a physical plan from the logical plan.
+        let plan = state.create_physical_plan(input).await?;
+
+        // Check that the schema of the plan matches the schema of this table.

Review Comment:
   What would the alternate behavior be? Pad missing columns with nulls?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to