This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ff55effd3 INSERT INTO support for MemTable (#5520)
ff55effd3 is described below

commit ff55effd3e0759a07b53071e357d7fc92b7879b8
Author: Metehan Yıldırım <[email protected]>
AuthorDate: Tue Mar 14 16:19:02 2023 +0300

    INSERT INTO support for MemTable (#5520)
    
    * Insert into memory table
    
    * Code simplifications
    
    * Minor comment refactor
    
    * Revamping tests and refactor code
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/core/src/datasource/datasource.rs       |  12 +-
 datafusion/core/src/datasource/memory.rs           | 223 ++++++++++++++++++++-
 datafusion/core/src/execution/context.rs           |  23 ++-
 .../sqllogictests/src/engines/datafusion/insert.rs |  93 ---------
 .../sqllogictests/src/engines/datafusion/mod.rs    |   3 -
 .../core/tests/sqllogictests/test_files/ddl.slt    |  28 ++-
 datafusion/expr/src/logical_plan/builder.rs        |  21 +-
 7 files changed, 292 insertions(+), 111 deletions(-)

diff --git a/datafusion/core/src/datasource/datasource.rs 
b/datafusion/core/src/datasource/datasource.rs
index 6277ce146..8db075a30 100644
--- a/datafusion/core/src/datasource/datasource.rs
+++ b/datafusion/core/src/datasource/datasource.rs
@@ -21,7 +21,7 @@ use std::any::Any;
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use datafusion_common::Statistics;
+use datafusion_common::{DataFusionError, Statistics};
 use datafusion_expr::{CreateExternalTable, LogicalPlan};
 pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
 
@@ -97,6 +97,16 @@ pub trait TableProvider: Sync + Send {
     fn statistics(&self) -> Option<Statistics> {
         None
     }
+
+    /// Insert into this table
+    async fn insert_into(
+        &self,
+        _state: &SessionState,
+        _input: &LogicalPlan,
+    ) -> Result<()> {
+        let msg = "Insertion not implemented for this table".to_owned();
+        Err(DataFusionError::NotImplemented(msg))
+    }
 }
 
 /// A factory which creates [`TableProvider`]s at runtime given a URL.
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index ac1f4947f..b5fa33e38 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -19,18 +19,22 @@
 //! queried by DataFusion. This allows data to be pre-loaded into memory and 
then
 //! repeatedly queried without incurring additional file I/O overhead.
 
-use futures::StreamExt;
+use futures::{StreamExt, TryStreamExt};
 use std::any::Any;
 use std::sync::Arc;
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
+use datafusion_expr::LogicalPlan;
+use tokio::sync::RwLock;
+use tokio::task;
 
 use crate::datasource::{TableProvider, TableType};
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::SessionState;
 use crate::logical_expr::Expr;
+use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use crate::physical_plan::common;
 use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::memory::MemoryExec;
@@ -41,7 +45,7 @@ use crate::physical_plan::{repartition::RepartitionExec, 
Partitioning};
 #[derive(Debug)]
 pub struct MemTable {
     schema: SchemaRef,
-    batches: Vec<Vec<RecordBatch>>,
+    batches: Arc<RwLock<Vec<Vec<RecordBatch>>>>,
 }
 
 impl MemTable {
@@ -54,7 +58,7 @@ impl MemTable {
         {
             Ok(Self {
                 schema,
-                batches: partitions,
+                batches: Arc::new(RwLock::new(partitions)),
             })
         } else {
             Err(DataFusionError::Plan(
@@ -143,22 +147,102 @@ impl TableProvider for MemTable {
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
+        let batches = &self.batches.read().await;
         Ok(Arc::new(MemoryExec::try_new(
-            &self.batches.clone(),
+            batches,
             self.schema(),
             projection.cloned(),
         )?))
     }
+
+    /// Inserts the execution results of a given [LogicalPlan] into this 
[MemTable].
+    /// The `LogicalPlan` must have the same schema as this `MemTable`.
+    ///
+    /// # Arguments
+    ///
+    /// * `state` - The [SessionState] containing the context for executing 
the plan.
+    /// * `input` - The [LogicalPlan] to execute and insert.
+    ///
+    /// # Returns
+    ///
+    /// * A `Result` indicating success or failure.
+    async fn insert_into(&self, state: &SessionState, input: &LogicalPlan) -> 
Result<()> {
+        // Create a physical plan from the logical plan.
+        let plan = state.create_physical_plan(input).await?;
+
+        // Check that the schema of the plan matches the schema of this table.
+        if !plan.schema().eq(&self.schema) {
+            return Err(DataFusionError::Plan(
+                "Inserting query must have the same schema with the 
table.".to_string(),
+            ));
+        }
+
+        // Get the number of partitions in the plan and the table.
+        let plan_partition_count = 
plan.output_partitioning().partition_count();
+        let table_partition_count = self.batches.read().await.len();
+
+        // Adjust the plan as necessary to match the number of partitions in 
the table.
+        let plan: Arc<dyn ExecutionPlan> = if plan_partition_count
+            == table_partition_count
+            || table_partition_count == 0
+        {
+            plan
+        } else if table_partition_count == 1 {
+            // If the table has only one partition, coalesce the partitions in 
the plan.
+            Arc::new(CoalescePartitionsExec::new(plan))
+        } else {
+            // Otherwise, repartition the plan using a round-robin 
partitioning scheme.
+            Arc::new(RepartitionExec::try_new(
+                plan,
+                Partitioning::RoundRobinBatch(table_partition_count),
+            )?)
+        };
+
+        // Get the task context from the session state.
+        let task_ctx = state.task_ctx();
+
+        // Execute the plan and collect the results into batches.
+        let mut tasks = vec![];
+        for idx in 0..plan.output_partitioning().partition_count() {
+            let stream = plan.execute(idx, task_ctx.clone())?;
+            let handle = task::spawn(async move {
+                stream.try_collect().await.map_err(DataFusionError::from)
+            });
+            tasks.push(AbortOnDropSingle::new(handle));
+        }
+        let results = futures::future::join_all(tasks)
+            .await
+            .into_iter()
+            .map(|result| {
+                result.map_err(|e| DataFusionError::Execution(format!("{e}")))?
+            })
+            .collect::<Result<Vec<Vec<RecordBatch>>>>()?;
+
+        // Write the results into the table.
+        let mut all_batches = self.batches.write().await;
+
+        if all_batches.is_empty() {
+            *all_batches = results
+        } else {
+            for (batches, result) in 
all_batches.iter_mut().zip(results.into_iter()) {
+                batches.extend(result);
+            }
+        }
+
+        Ok(())
+    }
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::datasource::provider_as_source;
     use crate::from_slice::FromSlice;
     use crate::prelude::SessionContext;
     use arrow::array::Int32Array;
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow::error::ArrowError;
+    use datafusion_expr::LogicalPlanBuilder;
     use futures::StreamExt;
     use std::collections::HashMap;
 
@@ -388,4 +472,135 @@ mod tests {
 
         Ok(())
     }
+
+    fn create_mem_table_scan(
+        schema: SchemaRef,
+        data: Vec<Vec<RecordBatch>>,
+    ) -> Result<Arc<LogicalPlan>> {
+        // Convert the table into a provider so that it can be used in a query
+        let provider = provider_as_source(Arc::new(MemTable::try_new(schema, 
data)?));
+        // Create a table scan logical plan to read from the table
+        Ok(Arc::new(
+            LogicalPlanBuilder::scan("source", provider, None)?.build()?,
+        ))
+    }
+
+    fn create_initial_ctx() -> Result<(SessionContext, SchemaRef, 
RecordBatch)> {
+        // Create a new session context
+        let session_ctx = SessionContext::new();
+        // Create a new schema with one field called "a" of type Int32
+        let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int32, false)]));
+
+        // Create a new batch of data to insert into the table
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(Int32Array::from_slice([1, 2, 3]))],
+        )?;
+        Ok((session_ctx, schema, batch))
+    }
+
+    #[tokio::test]
+    async fn test_insert_into_single_partition() -> Result<()> {
+        let (session_ctx, schema, batch) = create_initial_ctx()?;
+        let initial_table = Arc::new(MemTable::try_new(
+            schema.clone(),
+            vec![vec![batch.clone()]],
+        )?);
+        // Create a table scan logical plan to read from the table
+        let single_partition_table_scan =
+            create_mem_table_scan(schema.clone(), vec![vec![batch.clone()]])?;
+        // Insert the data from the provider into the table
+        initial_table
+            .insert_into(&session_ctx.state(), &single_partition_table_scan)
+            .await?;
+        // Ensure that the table now contains two batches of data in the same 
partition
+        assert_eq!(initial_table.batches.read().await.get(0).unwrap().len(), 
2);
+
+        // Create a new provider with 2 partitions
+        let multi_partition_table_scan = create_mem_table_scan(
+            schema.clone(),
+            vec![vec![batch.clone()], vec![batch]],
+        )?;
+
+        // Insert the data from the provider into the table. We expect 
coalescing partitions.
+        initial_table
+            .insert_into(&session_ctx.state(), &multi_partition_table_scan)
+            .await?;
+        // Ensure that the table now contains 4 batches of data with only 1 
partition
+        assert_eq!(initial_table.batches.read().await.get(0).unwrap().len(), 
4);
+        assert_eq!(initial_table.batches.read().await.len(), 1);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_insert_into_multiple_partition() -> Result<()> {
+        let (session_ctx, schema, batch) = create_initial_ctx()?;
+        // create a memory table with two partitions, each having one batch 
with the same data
+        let initial_table = Arc::new(MemTable::try_new(
+            schema.clone(),
+            vec![vec![batch.clone()], vec![batch.clone()]],
+        )?);
+
+        // scan a data source provider from a memory table with a single 
partition
+        let single_partition_table_scan = create_mem_table_scan(
+            schema.clone(),
+            vec![vec![batch.clone(), batch.clone()]],
+        )?;
+
+        // insert the data from the 1 partition data source provider into the 
initial table
+        initial_table
+            .insert_into(&session_ctx.state(), &single_partition_table_scan)
+            .await?;
+
+        // We expect round robin repartition here, each partition gets 1 batch.
+        assert_eq!(initial_table.batches.read().await.get(0).unwrap().len(), 
2);
+        assert_eq!(initial_table.batches.read().await.get(1).unwrap().len(), 
2);
+
+        // scan a data source provider from a memory table with 2 partition
+        let multi_partition_table_scan = create_mem_table_scan(
+            schema.clone(),
+            vec![vec![batch.clone()], vec![batch]],
+        )?;
+        // We expect one-to-one partition mapping.
+        initial_table
+            .insert_into(&session_ctx.state(), &multi_partition_table_scan)
+            .await?;
+        // Ensure that the table now contains 3 batches of data with 2 
partitions.
+        assert_eq!(initial_table.batches.read().await.get(0).unwrap().len(), 
3);
+        assert_eq!(initial_table.batches.read().await.get(1).unwrap().len(), 
3);
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_insert_into_empty_table() -> Result<()> {
+        let (session_ctx, schema, batch) = create_initial_ctx()?;
+        // create empty memory table
+        let initial_table = Arc::new(MemTable::try_new(schema.clone(), 
vec![])?);
+
+        // scan a data source provider from a memory table with a single 
partition
+        let single_partition_table_scan = create_mem_table_scan(
+            schema.clone(),
+            vec![vec![batch.clone(), batch.clone()]],
+        )?;
+
+        // insert the data from the 1 partition data source provider into the 
initial table
+        initial_table
+            .insert_into(&session_ctx.state(), &single_partition_table_scan)
+            .await?;
+
+        assert_eq!(initial_table.batches.read().await.get(0).unwrap().len(), 
2);
+
+        // scan a data source provider from a memory table with 2 partition
+        let single_partition_table_scan = create_mem_table_scan(
+            schema.clone(),
+            vec![vec![batch.clone()], vec![batch]],
+        )?;
+        // We expect coalesce partitions here.
+        initial_table
+            .insert_into(&session_ctx.state(), &single_partition_table_scan)
+            .await?;
+        // Ensure that the table now contains 3 batches of data with 2 
partitions.
+        assert_eq!(initial_table.batches.read().await.get(0).unwrap().len(), 
4);
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index c985ccdc5..aa02e4be3 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -31,7 +31,7 @@ use crate::{
         optimizer::PhysicalOptimizerRule,
     },
 };
-use datafusion_expr::{DescribeTable, StringifiedPlan};
+use datafusion_expr::{DescribeTable, DmlStatement, StringifiedPlan, WriteOp};
 pub use datafusion_physical_expr::execution_props::ExecutionProps;
 use datafusion_physical_expr::var_provider::is_system_variables;
 use parking_lot::RwLock;
@@ -308,7 +308,8 @@ impl SessionContext {
 
     /// Creates a [`DataFrame`] that will execute a SQL query.
     ///
-    /// Note: This API implements DDL such as `CREATE TABLE` and `CREATE VIEW` 
with in-memory
+    /// Note: This API implements DDL statements such as `CREATE TABLE` and
+    /// `CREATE VIEW` and DML statements such as `INSERT INTO` with in-memory
     /// default implementations.
     ///
     /// If this is not desirable, consider using 
[`SessionState::create_logical_plan()`] which
@@ -318,6 +319,24 @@ impl SessionContext {
         let plan = self.state().create_logical_plan(sql).await?;
 
         match plan {
+            LogicalPlan::Dml(DmlStatement {
+                table_name,
+                op: WriteOp::Insert,
+                input,
+                ..
+            }) => {
+                if self.table_exist(&table_name)? {
+                    let name = table_name.table();
+                    let provider = self.table_provider(name).await?;
+                    provider.insert_into(&self.state(), &input).await?;
+                } else {
+                    return Err(DataFusionError::Execution(format!(
+                        "Table '{}' does not exist",
+                        table_name
+                    )));
+                }
+                self.return_empty_dataframe()
+            }
             LogicalPlan::CreateExternalTable(cmd) => {
                 self.create_external_table(&cmd).await
             }
diff --git 
a/datafusion/core/tests/sqllogictests/src/engines/datafusion/insert.rs 
b/datafusion/core/tests/sqllogictests/src/engines/datafusion/insert.rs
deleted file mode 100644
index a8fca3b16..000000000
--- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/insert.rs
+++ /dev/null
@@ -1,93 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use super::error::Result;
-use crate::engines::datafusion::util::LogicTestContextProvider;
-use crate::engines::output::DFOutput;
-use arrow::record_batch::RecordBatch;
-use datafusion::datasource::MemTable;
-use datafusion::prelude::SessionContext;
-use datafusion_common::{DFSchema, DataFusionError};
-use datafusion_expr::Expr as DFExpr;
-use datafusion_sql::planner::{object_name_to_table_reference, PlannerContext, 
SqlToRel};
-use sqllogictest::DBOutput;
-use sqlparser::ast::{Expr, SetExpr, Statement as SQLStatement};
-use std::sync::Arc;
-
-pub async fn insert(ctx: &SessionContext, insert_stmt: SQLStatement) -> 
Result<DFOutput> {
-    // First, use sqlparser to get table name and insert values
-    let table_reference;
-    let insert_values: Vec<Vec<Expr>>;
-    match insert_stmt {
-        SQLStatement::Insert {
-            table_name, source, ..
-        } => {
-            table_reference = object_name_to_table_reference(
-                table_name,
-                ctx.enable_ident_normalization(),
-            )?;
-
-            // Todo: check columns match table schema
-            match *source.body {
-                SetExpr::Values(values) => {
-                    insert_values = values.rows;
-                }
-                _ => {
-                    // Directly panic: make it easy to find the location of 
the error.
-                    panic!()
-                }
-            }
-        }
-        _ => unreachable!(),
-    }
-
-    // Second, get batches in table and destroy the old table
-    let mut origin_batches = 
ctx.table(&table_reference).await?.collect().await?;
-    let schema = ctx.table_provider(&table_reference).await?.schema();
-    ctx.deregister_table(&table_reference)?;
-
-    // Third, transfer insert values to `RecordBatch`
-    // Attention: schema info can be ignored. (insert values don't contain 
schema info)
-    let sql_to_rel = SqlToRel::new(&LogicTestContextProvider {});
-    let num_rows = insert_values.len();
-    for row in insert_values.into_iter() {
-        let logical_exprs = row
-            .into_iter()
-            .map(|expr| {
-                sql_to_rel.sql_to_expr(
-                    expr,
-                    &DFSchema::empty(),
-                    &mut PlannerContext::new(),
-                )
-            })
-            .collect::<Result<Vec<DFExpr>, DataFusionError>>()?;
-        // Directly use `select` to get `RecordBatch`
-        let dataframe = ctx.read_empty()?;
-        
origin_batches.extend(dataframe.select(logical_exprs)?.collect().await?)
-    }
-
-    // Replace new batches schema to old schema
-    for batch in origin_batches.iter_mut() {
-        *batch = RecordBatch::try_new(schema.clone(), 
batch.columns().to_vec())?;
-    }
-
-    // Final, create new memtable with same schema.
-    let new_provider = MemTable::try_new(schema, vec![origin_batches])?;
-    ctx.register_table(&table_reference, Arc::new(new_provider))?;
-
-    Ok(DBOutput::StatementComplete(num_rows as u64))
-}
diff --git a/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs 
b/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs
index 1f8f7feb3..cdd6663a5 100644
--- a/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs
+++ b/datafusion/core/tests/sqllogictests/src/engines/datafusion/mod.rs
@@ -26,13 +26,11 @@ use create_table::create_table;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::prelude::SessionContext;
 use datafusion_sql::parser::{DFParser, Statement};
-use insert::insert;
 use sqllogictest::DBOutput;
 use sqlparser::ast::Statement as SQLStatement;
 
 mod create_table;
 mod error;
-mod insert;
 mod normalize;
 mod util;
 
@@ -85,7 +83,6 @@ async fn run_query(ctx: &SessionContext, sql: impl 
Into<String>) -> Result<DFOut
         if let Statement::Statement(statement) = statement0 {
             let statement = *statement;
             match statement {
-                SQLStatement::Insert { .. } => return insert(ctx, 
statement).await,
                 SQLStatement::CreateTable {
                     query,
                     constraints,
diff --git a/datafusion/core/tests/sqllogictests/test_files/ddl.slt 
b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
index 642093c36..59bfc91b5 100644
--- a/datafusion/core/tests/sqllogictests/test_files/ddl.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/ddl.slt
@@ -63,7 +63,7 @@ statement error Table 'user' doesn't exist.
 DROP TABLE user;
 
 # Can not insert into a undefined table
-statement error No table named 'user'
+statement error DataFusion error: Error during planning: table 
'datafusion.public.user' not found
 insert into user values(1, 20);
 
 ##########
@@ -421,9 +421,27 @@ statement ok
 DROP TABLE aggregate_simple
 
 
+# sql_table_insert
+statement ok
+CREATE TABLE abc AS VALUES (1,2,3), (4,5,6);
+
+statement ok
+CREATE TABLE xyz AS VALUES (1,3,3), (5,5,6);
+
+statement ok
+INSERT INTO abc SELECT * FROM xyz;
+
+query III
+SELECT * FROM abc
+----
+1 2 3
+4 5 6
+1 3 3
+5 5 6
+
 # Should create an empty table
 statement ok
-CREATE TABLE table_without_values(field1 BIGINT, field2 BIGINT);
+CREATE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL);
 
 
 # Should skip existing table
@@ -444,8 +462,8 @@ CREATE OR REPLACE TABLE IF NOT EXISTS 
table_without_values(field1 BIGINT, field2
 statement ok
 insert into table_without_values values (1, 2), (2, 3), (2, 4);
 
-query II rowsort
-select * from table_without_values;
+query II
+select * from table_without_values
 ----
 1 2
 2 3
@@ -454,7 +472,7 @@ select * from table_without_values;
 
 # Should recreate existing table
 statement ok
-CREATE OR REPLACE TABLE table_without_values(field1 BIGINT, field2 BIGINT);
+CREATE OR REPLACE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT 
NULL);
 
 
 # Should insert into a recreated table
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index a1fa82cda..9be046934 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -24,7 +24,7 @@ use crate::expr_rewriter::{
 };
 use crate::type_coercion::binary::comparison_coercion;
 use crate::utils::{columnize_expr, compare_sort_expr, exprlist_to_fields, 
from_plan};
-use crate::{and, binary_expr, Operator};
+use crate::{and, binary_expr, DmlStatement, Operator, WriteOp};
 use crate::{
     logical_plan::{
         Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, 
Filter, Join,
@@ -40,8 +40,8 @@ use crate::{
 };
 use arrow::datatypes::{DataType, Schema, SchemaRef};
 use datafusion_common::{
-    Column, DFField, DFSchema, DFSchemaRef, DataFusionError, Result, 
ScalarValue,
-    ToDFSchema,
+    Column, DFField, DFSchema, DFSchemaRef, DataFusionError, 
OwnedTableReference, Result,
+    ScalarValue, ToDFSchema,
 };
 use std::any::Any;
 use std::cmp::Ordering;
@@ -201,6 +201,21 @@ impl LogicalPlanBuilder {
         Self::scan_with_filters(table_name, table_source, projection, vec![])
     }
 
+    /// Create a [DmlStatement] for inserting the contents of this builder 
into the named table
+    pub fn insert_into(
+        input: LogicalPlan,
+        table_name: impl Into<OwnedTableReference>,
+        table_schema: &Schema,
+    ) -> Result<Self> {
+        let table_schema = table_schema.clone().to_dfschema_ref()?;
+        Ok(Self::from(LogicalPlan::Dml(DmlStatement {
+            table_name: table_name.into(),
+            table_schema,
+            op: WriteOp::Insert,
+            input: Arc::new(input),
+        })))
+    }
+
     /// Convert a table provider into a builder with a TableScan
     pub fn scan_with_filters(
         table_name: impl Into<String>,

Reply via email to