This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3892499b28 Support REPLACE INTO for INSERT statements (#12516)
3892499b28 is described below

commit 3892499b28135f9862ffac3e4f19524bb5b77c59
Author: Fredrik Meringdal <[email protected]>
AuthorDate: Sat Sep 28 18:08:37 2024 +0200

    Support REPLACE INTO for INSERT statements (#12516)
    
    * Add support for REPLACE INTO statements.
    
    This commit introduces an `InsertOp` enum to replace the boolean
    `overwrite` flag to provide a more clear and flexible control over how
    data is inserted. This change updates the following APIs and configs to
    reflect the change: `TableProvider::insert_into`, `FileSinkConfig` and
    `DataFrameWriteOptions`.
    
    * fix clippy and add license
    
    * Update vendored code
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/catalog/src/table.rs                    |   3 +-
 datafusion/core/src/dataframe/mod.rs               |  35 ++--
 datafusion/core/src/dataframe/parquet.rs           |  10 +-
 .../core/src/datasource/file_format/arrow.rs       |   3 +-
 datafusion/core/src/datasource/file_format/csv.rs  |   3 +-
 datafusion/core/src/datasource/file_format/json.rs |   3 +-
 .../core/src/datasource/file_format/parquet.rs     |   9 +-
 datafusion/core/src/datasource/listing/table.rs    |   8 +-
 datafusion/core/src/datasource/memory.rs           |  10 +-
 .../core/src/datasource/physical_plan/mod.rs       |   6 +-
 datafusion/core/src/datasource/stream.rs           |   3 +-
 datafusion/core/src/physical_planner.rs            |  24 +--
 .../core/tests/user_defined/insert_operation.rs    | 188 +++++++++++++++++++++
 datafusion/core/tests/user_defined/mod.rs          |   3 +
 datafusion/expr/src/logical_plan/builder.rs        |  11 +-
 datafusion/expr/src/logical_plan/dml.rs            |  37 +++-
 datafusion/proto/proto/datafusion.proto            |   9 +-
 datafusion/proto/src/generated/pbjson.rs           | 109 ++++++++++--
 datafusion/proto/src/generated/prost.rs            |  33 +++-
 datafusion/proto/src/physical_plan/from_proto.rs   |   8 +-
 datafusion/proto/src/physical_plan/to_proto.rs     |   2 +-
 .../proto/tests/cases/roundtrip_physical_plan.rs   |   7 +-
 datafusion/sql/src/statement.rs                    |  30 ++--
 23 files changed, 447 insertions(+), 107 deletions(-)

diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs
index 6c36d907ac..ca3a2bef88 100644
--- a/datafusion/catalog/src/table.rs
+++ b/datafusion/catalog/src/table.rs
@@ -25,6 +25,7 @@ use arrow_schema::SchemaRef;
 use async_trait::async_trait;
 use datafusion_common::Result;
 use datafusion_common::{not_impl_err, Constraints, Statistics};
+use datafusion_expr::dml::InsertOp;
 use datafusion_expr::{
     CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown, 
TableType,
 };
@@ -274,7 +275,7 @@ pub trait TableProvider: Debug + Sync + Send {
         &self,
         _state: &dyn Session,
         _input: Arc<dyn ExecutionPlan>,
-        _overwrite: bool,
+        _insert_op: InsertOp,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         not_impl_err!("Insert into not implemented for this table")
     }
diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index 72b763ce0f..70c5075114 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -52,6 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
 use datafusion_common::{
     plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError, 
UnnestOptions,
 };
+use datafusion_expr::dml::InsertOp;
 use datafusion_expr::{case, is_null, lit, SortExpr};
 use datafusion_expr::{
     utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
@@ -66,8 +67,9 @@ use datafusion_catalog::Session;
 /// Contains options that control how data is
 /// written out from a DataFrame
 pub struct DataFrameWriteOptions {
-    /// Controls if existing data should be overwritten
-    overwrite: bool,
+    /// Controls how new data should be written to the table, determining 
whether
+    /// to append, overwrite, or replace existing data.
+    insert_op: InsertOp,
     /// Controls if all partitions should be coalesced into a single output 
file
     /// Generally will have slower performance when set to true.
     single_file_output: bool,
@@ -80,14 +82,15 @@ impl DataFrameWriteOptions {
     /// Create a new DataFrameWriteOptions with default values
     pub fn new() -> Self {
         DataFrameWriteOptions {
-            overwrite: false,
+            insert_op: InsertOp::Append,
             single_file_output: false,
             partition_by: vec![],
         }
     }
-    /// Set the overwrite option to true or false
-    pub fn with_overwrite(mut self, overwrite: bool) -> Self {
-        self.overwrite = overwrite;
+
+    /// Set the insert operation
+    pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self {
+        self.insert_op = insert_op;
         self
     }
 
@@ -1525,7 +1528,7 @@ impl DataFrame {
             self.plan,
             table_name.to_owned(),
             &arrow_schema,
-            write_options.overwrite,
+            write_options.insert_op,
         )?
         .build()?;
 
@@ -1566,10 +1569,11 @@ impl DataFrame {
         options: DataFrameWriteOptions,
         writer_options: Option<CsvOptions>,
     ) -> Result<Vec<RecordBatch>, DataFusionError> {
-        if options.overwrite {
-            return Err(DataFusionError::NotImplemented(
-                "Overwrites are not implemented for 
DataFrame::write_csv.".to_owned(),
-            ));
+        if options.insert_op != InsertOp::Append {
+            return Err(DataFusionError::NotImplemented(format!(
+                "{} is not implemented for DataFrame::write_csv.",
+                options.insert_op
+            )));
         }
 
         let format = if let Some(csv_opts) = writer_options {
@@ -1626,10 +1630,11 @@ impl DataFrame {
         options: DataFrameWriteOptions,
         writer_options: Option<JsonOptions>,
     ) -> Result<Vec<RecordBatch>, DataFusionError> {
-        if options.overwrite {
-            return Err(DataFusionError::NotImplemented(
-                "Overwrites are not implemented for 
DataFrame::write_json.".to_owned(),
-            ));
+        if options.insert_op != InsertOp::Append {
+            return Err(DataFusionError::NotImplemented(format!(
+                "{} is not implemented for DataFrame::write_json.",
+                options.insert_op
+            )));
         }
 
         let format = if let Some(json_opts) = writer_options {
diff --git a/datafusion/core/src/dataframe/parquet.rs 
b/datafusion/core/src/dataframe/parquet.rs
index 66974e37f4..f90b35fde6 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -26,6 +26,7 @@ use super::{
 };
 
 use datafusion_common::config::TableParquetOptions;
+use datafusion_expr::dml::InsertOp;
 
 impl DataFrame {
     /// Execute the `DataFrame` and write the results to Parquet file(s).
@@ -57,10 +58,11 @@ impl DataFrame {
         options: DataFrameWriteOptions,
         writer_options: Option<TableParquetOptions>,
     ) -> Result<Vec<RecordBatch>, DataFusionError> {
-        if options.overwrite {
-            return Err(DataFusionError::NotImplemented(
-                "Overwrites are not implemented for 
DataFrame::write_parquet.".to_owned(),
-            ));
+        if options.insert_op != InsertOp::Append {
+            return Err(DataFusionError::NotImplemented(format!(
+                "{} is not implemented for DataFrame::write_parquet.",
+                options.insert_op
+            )));
         }
 
         let format = if let Some(parquet_opts) = writer_options {
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs 
b/datafusion/core/src/datasource/file_format/arrow.rs
index 6ee4280956..c10ebbd6c9 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -47,6 +47,7 @@ use datafusion_common::{
     not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
 };
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+use datafusion_expr::dml::InsertOp;
 use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
 use datafusion_physical_plan::metrics::MetricsSet;
@@ -181,7 +182,7 @@ impl FileFormat for ArrowFormat {
         conf: FileSinkConfig,
         order_requirements: Option<LexRequirement>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if conf.overwrite {
+        if conf.insert_op != InsertOp::Append {
             return not_impl_err!("Overwrites are not implemented yet for Arrow 
format");
         }
 
diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 99e8f13776..e821fa806f 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -46,6 +46,7 @@ use datafusion_common::{
     exec_err, not_impl_err, DataFusionError, GetExt, DEFAULT_CSV_EXTENSION,
 };
 use datafusion_execution::TaskContext;
+use datafusion_expr::dml::InsertOp;
 use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_plan::metrics::MetricsSet;
 
@@ -382,7 +383,7 @@ impl FileFormat for CsvFormat {
         conf: FileSinkConfig,
         order_requirements: Option<LexRequirement>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if conf.overwrite {
+        if conf.insert_op != InsertOp::Append {
             return not_impl_err!("Overwrites are not implemented yet for CSV");
         }
 
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 4471d7d6cb..c9ed0c0d28 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -46,6 +46,7 @@ use datafusion_common::config::{ConfigField, ConfigFileType, 
JsonOptions};
 use datafusion_common::file_options::json_writer::JsonWriterOptions;
 use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
 use datafusion_execution::TaskContext;
+use datafusion_expr::dml::InsertOp;
 use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_plan::metrics::MetricsSet;
 use datafusion_physical_plan::ExecutionPlan;
@@ -252,7 +253,7 @@ impl FileFormat for JsonFormat {
         conf: FileSinkConfig,
         order_requirements: Option<LexRequirement>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if conf.overwrite {
+        if conf.insert_op != InsertOp::Append {
             return not_impl_err!("Overwrites are not implemented yet for 
Json");
         }
 
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 35296b0d79..98ae0ce14b 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -53,6 +53,7 @@ use datafusion_common::{
 use datafusion_common_runtime::SpawnedTask;
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, 
MemoryReservation};
 use datafusion_execution::TaskContext;
+use datafusion_expr::dml::InsertOp;
 use datafusion_expr::Expr;
 use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
 use datafusion_physical_expr::PhysicalExpr;
@@ -403,7 +404,7 @@ impl FileFormat for ParquetFormat {
         conf: FileSinkConfig,
         order_requirements: Option<LexRequirement>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        if conf.overwrite {
+        if conf.insert_op != InsertOp::Append {
             return not_impl_err!("Overwrites are not implemented yet for 
Parquet");
         }
 
@@ -2269,7 +2270,7 @@ mod tests {
             table_paths: vec![ListingTableUrl::parse("file:///")?],
             output_schema: schema.clone(),
             table_partition_cols: vec![],
-            overwrite: true,
+            insert_op: InsertOp::Overwrite,
             keep_partition_by_columns: false,
         };
         let parquet_sink = Arc::new(ParquetSink::new(
@@ -2364,7 +2365,7 @@ mod tests {
             table_paths: vec![ListingTableUrl::parse("file:///")?],
             output_schema: schema.clone(),
             table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // 
add partitioning
-            overwrite: true,
+            insert_op: InsertOp::Overwrite,
             keep_partition_by_columns: false,
         };
         let parquet_sink = Arc::new(ParquetSink::new(
@@ -2447,7 +2448,7 @@ mod tests {
                 table_paths: vec![ListingTableUrl::parse("file:///")?],
                 output_schema: schema.clone(),
                 table_partition_cols: vec![],
-                overwrite: true,
+                insert_op: InsertOp::Overwrite,
                 keep_partition_by_columns: false,
             };
             let parquet_sink = Arc::new(ParquetSink::new(
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 2a35fddeb0..3eb8eed9de 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -34,6 +34,7 @@ use crate::datasource::{
 use crate::execution::context::SessionState;
 use datafusion_catalog::TableProvider;
 use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::dml::InsertOp;
 use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
 use datafusion_expr::{SortExpr, TableType};
 use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
@@ -916,7 +917,7 @@ impl TableProvider for ListingTable {
         &self,
         state: &dyn Session,
         input: Arc<dyn ExecutionPlan>,
-        overwrite: bool,
+        insert_op: InsertOp,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // Check that the schema of the plan matches the schema of this table.
         if !self
@@ -975,7 +976,7 @@ impl TableProvider for ListingTable {
             file_groups,
             output_schema: self.schema(),
             table_partition_cols: self.options.table_partition_cols.clone(),
-            overwrite,
+            insert_op,
             keep_partition_by_columns,
         };
 
@@ -1990,7 +1991,8 @@ mod tests {
         // Therefore, we will have 8 partitions in the final plan.
         // Create an insert plan to insert the source data into the initial 
table
         let insert_into_table =
-            LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, 
false)?.build()?;
+            LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, 
InsertOp::Append)?
+                .build()?;
         // Create a physical plan from the insert plan
         let plan = session_ctx
             .state()
diff --git a/datafusion/core/src/datasource/memory.rs 
b/datafusion/core/src/datasource/memory.rs
index 70f3c36b81..24a4938e7b 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -39,6 +39,7 @@ use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, 
SchemaExt};
 use datafusion_execution::TaskContext;
+use datafusion_expr::dml::InsertOp;
 use datafusion_physical_plan::metrics::MetricsSet;
 
 use async_trait::async_trait;
@@ -262,7 +263,7 @@ impl TableProvider for MemTable {
         &self,
         _state: &dyn Session,
         input: Arc<dyn ExecutionPlan>,
-        overwrite: bool,
+        insert_op: InsertOp,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         // If we are inserting into the table, any sort order may be messed up 
so reset it here
         *self.sort_order.lock() = vec![];
@@ -289,8 +290,8 @@ impl TableProvider for MemTable {
                     .collect::<Vec<_>>()
             );
         }
-        if overwrite {
-            return not_impl_err!("Overwrite not implemented for MemoryTable 
yet");
+        if insert_op != InsertOp::Append {
+            return not_impl_err!("{insert_op} not implemented for MemoryTable 
yet");
         }
         let sink = Arc::new(MemSink::new(self.batches.clone()));
         Ok(Arc::new(DataSinkExec::new(
@@ -638,7 +639,8 @@ mod tests {
         let scan_plan = LogicalPlanBuilder::scan("source", source, 
None)?.build()?;
         // Create an insert plan to insert the source data into the initial 
table
         let insert_into_table =
-            LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, 
false)?.build()?;
+            LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, 
InsertOp::Append)?
+                .build()?;
         // Create a physical plan from the insert plan
         let plan = session_ctx
             .state()
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 4018b3bb29..6e8752ccfb 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -36,6 +36,7 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics, 
ParquetFileReaderFactor
 pub use arrow_file::ArrowExec;
 pub use avro::AvroExec;
 pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener};
+use datafusion_expr::dml::InsertOp;
 pub use file_groups::FileGroupPartitioner;
 pub use file_scan_config::{
     wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
@@ -83,8 +84,9 @@ pub struct FileSinkConfig {
     /// A vector of column names and their corresponding data types,
     /// representing the partitioning columns for the file
     pub table_partition_cols: Vec<(String, DataType)>,
-    /// Controls whether existing data should be overwritten by this sink
-    pub overwrite: bool,
+    /// Controls how new data should be written to the file, determining 
whether
+    /// to append to, overwrite, or replace records in existing files.
+    pub insert_op: InsertOp,
     /// Controls whether partition columns are kept for the file
     pub keep_partition_by_columns: bool,
 }
diff --git a/datafusion/core/src/datasource/stream.rs 
b/datafusion/core/src/datasource/stream.rs
index d30247e2c6..34023fbbb6 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/core/src/datasource/stream.rs
@@ -33,6 +33,7 @@ use arrow_schema::SchemaRef;
 use datafusion_common::{config_err, plan_err, Constraints, DataFusionError, 
Result};
 use datafusion_common_runtime::SpawnedTask;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+use datafusion_expr::dml::InsertOp;
 use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
 use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
 use datafusion_physical_plan::metrics::MetricsSet;
@@ -350,7 +351,7 @@ impl TableProvider for StreamTable {
         &self,
         _state: &dyn Session,
         input: Arc<dyn ExecutionPlan>,
-        _overwrite: bool,
+        _insert_op: InsertOp,
     ) -> Result<Arc<dyn ExecutionPlan>> {
         let ordering = match self.0.order.first() {
             Some(x) => {
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index b2b912d8ad..520392c9f0 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -71,7 +71,7 @@ use datafusion_common::{
     exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, 
DFSchema,
     ScalarValue,
 };
-use datafusion_expr::dml::CopyTo;
+use datafusion_expr::dml::{CopyTo, InsertOp};
 use datafusion_expr::expr::{
     physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
 };
@@ -529,7 +529,7 @@ impl DefaultPhysicalPlanner {
                     file_groups: vec![],
                     output_schema: Arc::new(schema),
                     table_partition_cols,
-                    overwrite: false,
+                    insert_op: InsertOp::Append,
                     keep_partition_by_columns,
                 };
 
@@ -542,7 +542,7 @@ impl DefaultPhysicalPlanner {
             }
             LogicalPlan::Dml(DmlStatement {
                 table_name,
-                op: WriteOp::InsertInto,
+                op: WriteOp::Insert(insert_op),
                 ..
             }) => {
                 let name = table_name.table();
@@ -550,23 +550,7 @@ impl DefaultPhysicalPlanner {
                 if let Some(provider) = schema.table(name).await? {
                     let input_exec = children.one()?;
                     provider
-                        .insert_into(session_state, input_exec, false)
-                        .await?
-                } else {
-                    return exec_err!("Table '{table_name}' does not exist");
-                }
-            }
-            LogicalPlan::Dml(DmlStatement {
-                table_name,
-                op: WriteOp::InsertOverwrite,
-                ..
-            }) => {
-                let name = table_name.table();
-                let schema = session_state.schema_for_ref(table_name.clone())?;
-                if let Some(provider) = schema.table(name).await? {
-                    let input_exec = children.one()?;
-                    provider
-                        .insert_into(session_state, input_exec, true)
+                        .insert_into(session_state, input_exec, *insert_op)
                         .await?
                 } else {
                     return exec_err!("Table '{table_name}' does not exist");
diff --git a/datafusion/core/tests/user_defined/insert_operation.rs 
b/datafusion/core/tests/user_defined/insert_operation.rs
new file mode 100644
index 0000000000..ff14fa0be3
--- /dev/null
+++ b/datafusion/core/tests/user_defined/insert_operation.rs
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion::{
+    error::Result,
+    prelude::{SessionConfig, SessionContext},
+};
+use datafusion_catalog::{Session, TableProvider};
+use datafusion_expr::{dml::InsertOp, Expr, TableType};
+use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
+use datafusion_physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan, 
PlanProperties};
+
+#[tokio::test]
+async fn insert_operation_is_passed_correctly_to_table_provider() {
+    // Use the SQLite syntax so we can test the "INSERT OR REPLACE INTO" syntax
+    let ctx = session_ctx_with_dialect("SQLite");
+    let table_provider = Arc::new(TestInsertTableProvider::new());
+    ctx.register_table("testing", table_provider.clone())
+        .unwrap();
+
+    let sql = "INSERT INTO testing (column) VALUES (1)";
+    assert_insert_op(&ctx, sql, InsertOp::Append).await;
+
+    let sql = "INSERT OVERWRITE testing (column) VALUES (1)";
+    assert_insert_op(&ctx, sql, InsertOp::Overwrite).await;
+
+    let sql = "REPLACE INTO testing (column) VALUES (1)";
+    assert_insert_op(&ctx, sql, InsertOp::Replace).await;
+
+    let sql = "INSERT OR REPLACE INTO testing (column) VALUES (1)";
+    assert_insert_op(&ctx, sql, InsertOp::Replace).await;
+}
+
+async fn assert_insert_op(ctx: &SessionContext, sql: &str, insert_op: 
InsertOp) {
+    let df = ctx.sql(sql).await.unwrap();
+    let plan = df.create_physical_plan().await.unwrap();
+    let exec = plan.as_any().downcast_ref::<TestInsertExec>().unwrap();
+    assert_eq!(exec.op, insert_op);
+}
+
+fn session_ctx_with_dialect(dialect: impl Into<String>) -> SessionContext {
+    let mut config = SessionConfig::new();
+    let options = config.options_mut();
+    options.sql_parser.dialect = dialect.into();
+    SessionContext::new_with_config(config)
+}
+
+#[derive(Debug)]
+struct TestInsertTableProvider {
+    schema: SchemaRef,
+}
+
+impl TestInsertTableProvider {
+    fn new() -> Self {
+        Self {
+            schema: SchemaRef::new(Schema::new(vec![Field::new(
+                "column",
+                DataType::Int64,
+                false,
+            )])),
+        }
+    }
+}
+
+#[async_trait]
+impl TableProvider for TestInsertTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        _state: &dyn Session,
+        _projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        unimplemented!("TestInsertTableProvider is a stub for testing.")
+    }
+
+    async fn insert_into(
+        &self,
+        _state: &dyn Session,
+        _input: Arc<dyn ExecutionPlan>,
+        insert_op: InsertOp,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(TestInsertExec::new(insert_op)))
+    }
+}
+
+#[derive(Debug)]
+struct TestInsertExec {
+    op: InsertOp,
+    plan_properties: PlanProperties,
+}
+
+impl TestInsertExec {
+    fn new(op: InsertOp) -> Self {
+        let eq_properties = EquivalenceProperties::new(make_count_schema());
+        let plan_properties = PlanProperties::new(
+            eq_properties,
+            Partitioning::UnknownPartitioning(1),
+            ExecutionMode::Bounded,
+        );
+        Self {
+            op,
+            plan_properties,
+        }
+    }
+}
+
+impl DisplayAs for TestInsertExec {
+    fn fmt_as(
+        &self,
+        _t: datafusion_physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "TestInsertExec")
+    }
+}
+
+impl ExecutionPlan for TestInsertExec {
+    fn name(&self) -> &str {
+        "TestInsertExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.plan_properties
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        assert!(children.is_empty());
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<datafusion_execution::TaskContext>,
+    ) -> Result<datafusion_execution::SendableRecordBatchStream> {
+        unimplemented!("TestInsertExec is a stub for testing.")
+    }
+}
+
+fn make_count_schema() -> SchemaRef {
+    Arc::new(Schema::new(vec![Field::new(
+        "count",
+        DataType::UInt64,
+        false,
+    )]))
+}
diff --git a/datafusion/core/tests/user_defined/mod.rs 
b/datafusion/core/tests/user_defined/mod.rs
index 56cec8df46..5d84cdb692 100644
--- a/datafusion/core/tests/user_defined/mod.rs
+++ b/datafusion/core/tests/user_defined/mod.rs
@@ -32,3 +32,6 @@ mod user_defined_table_functions;
 
 /// Tests for Expression Planner
 mod expr_planner;
+
+/// Tests for insert operations
+mod insert_operation;
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index ad96f6a85d..cc8ddf8ec8 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -54,6 +54,7 @@ use datafusion_common::{
     TableReference, ToDFSchema, UnnestOptions,
 };
 
+use super::dml::InsertOp;
 use super::plan::{ColumnUnnestList, ColumnUnnestType};
 
 /// Default table name for unnamed table
@@ -307,20 +308,14 @@ impl LogicalPlanBuilder {
         input: LogicalPlan,
         table_name: impl Into<TableReference>,
         table_schema: &Schema,
-        overwrite: bool,
+        insert_op: InsertOp,
     ) -> Result<Self> {
         let table_schema = table_schema.clone().to_dfschema_ref()?;
 
-        let op = if overwrite {
-            WriteOp::InsertOverwrite
-        } else {
-            WriteOp::InsertInto
-        };
-
         Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
             table_name.into(),
             table_schema,
-            op,
+            WriteOp::Insert(insert_op),
             Arc::new(input),
         ))))
     }
diff --git a/datafusion/expr/src/logical_plan/dml.rs 
b/datafusion/expr/src/logical_plan/dml.rs
index c2ed9dc078..68b3ac41fa 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -146,8 +146,7 @@ impl PartialOrd for DmlStatement {
 
 #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
 pub enum WriteOp {
-    InsertOverwrite,
-    InsertInto,
+    Insert(InsertOp),
     Delete,
     Update,
     Ctas,
@@ -157,8 +156,7 @@ impl WriteOp {
     /// Return a descriptive name of this [`WriteOp`]
     pub fn name(&self) -> &str {
         match self {
-            WriteOp::InsertOverwrite => "Insert Overwrite",
-            WriteOp::InsertInto => "Insert Into",
+            WriteOp::Insert(insert) => insert.name(),
             WriteOp::Delete => "Delete",
             WriteOp::Update => "Update",
             WriteOp::Ctas => "Ctas",
@@ -172,6 +170,37 @@ impl Display for WriteOp {
     }
 }
 
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
+pub enum InsertOp {
+    /// Appends new rows to the existing table without modifying any
+    /// existing rows. This corresponds to the SQL `INSERT INTO` query.
+    Append,
+    /// Overwrites all existing rows in the table with the new rows.
+    /// This corresponds to the SQL `INSERT OVERWRITE` query.
+    Overwrite,
+    /// If any existing rows collides with the inserted rows (typically based
+    /// on a unique key or primary key), those existing rows are replaced.
+    /// This corresponds to the SQL `REPLACE INTO` query and its equivalents.
+    Replace,
+}
+
+impl InsertOp {
+    /// Return a descriptive name of this [`InsertOp`]
+    pub fn name(&self) -> &str {
+        match self {
+            InsertOp::Append => "Insert Into",
+            InsertOp::Overwrite => "Insert Overwrite",
+            InsertOp::Replace => "Replace Into",
+        }
+    }
+}
+
+impl Display for InsertOp {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "{}", self.name())
+    }
+}
+
 fn make_count_schema() -> DFSchemaRef {
     Arc::new(
         Schema::new(vec![Field::new("count", DataType::UInt64, false)])
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 1204c843fd..e36c91e7d0 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -731,14 +731,21 @@ message PartitionColumn {
 
 message FileSinkConfig {
   reserved 6; // writer_mode
+  reserved 8; // was `overwrite` which has been superseded by `insert_op`
 
   string object_store_url = 1;
   repeated PartitionedFile file_groups = 2;
   repeated string table_paths = 3;
   datafusion_common.Schema output_schema = 4;
   repeated PartitionColumn table_partition_cols = 5;
-  bool overwrite = 8;
   bool keep_partition_by_columns = 9;
+  InsertOp insert_op = 10; 
+}
+
+enum InsertOp {
+  Append = 0;
+  Overwrite = 1;
+  Replace = 2;
 }
 
 message JsonSink {
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 0614e33b7a..004798b3ba 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -5832,10 +5832,10 @@ impl serde::Serialize for FileSinkConfig {
         if !self.table_partition_cols.is_empty() {
             len += 1;
         }
-        if self.overwrite {
+        if self.keep_partition_by_columns {
             len += 1;
         }
-        if self.keep_partition_by_columns {
+        if self.insert_op != 0 {
             len += 1;
         }
         let mut struct_ser = 
serializer.serialize_struct("datafusion.FileSinkConfig", len)?;
@@ -5854,12 +5854,14 @@ impl serde::Serialize for FileSinkConfig {
         if !self.table_partition_cols.is_empty() {
             struct_ser.serialize_field("tablePartitionCols", 
&self.table_partition_cols)?;
         }
-        if self.overwrite {
-            struct_ser.serialize_field("overwrite", &self.overwrite)?;
-        }
         if self.keep_partition_by_columns {
             struct_ser.serialize_field("keepPartitionByColumns", 
&self.keep_partition_by_columns)?;
         }
+        if self.insert_op != 0 {
+            let v = InsertOp::try_from(self.insert_op)
+                .map_err(|_| serde::ser::Error::custom(format!("Invalid 
variant {}", self.insert_op)))?;
+            struct_ser.serialize_field("insertOp", &v)?;
+        }
         struct_ser.end()
     }
 }
@@ -5880,9 +5882,10 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
             "outputSchema",
             "table_partition_cols",
             "tablePartitionCols",
-            "overwrite",
             "keep_partition_by_columns",
             "keepPartitionByColumns",
+            "insert_op",
+            "insertOp",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -5892,8 +5895,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
             TablePaths,
             OutputSchema,
             TablePartitionCols,
-            Overwrite,
             KeepPartitionByColumns,
+            InsertOp,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> 
std::result::Result<GeneratedField, D::Error>
@@ -5920,8 +5923,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                             "tablePaths" | "table_paths" => 
Ok(GeneratedField::TablePaths),
                             "outputSchema" | "output_schema" => 
Ok(GeneratedField::OutputSchema),
                             "tablePartitionCols" | "table_partition_cols" => 
Ok(GeneratedField::TablePartitionCols),
-                            "overwrite" => Ok(GeneratedField::Overwrite),
                             "keepPartitionByColumns" | 
"keep_partition_by_columns" => Ok(GeneratedField::KeepPartitionByColumns),
+                            "insertOp" | "insert_op" => 
Ok(GeneratedField::InsertOp),
                             _ => Err(serde::de::Error::unknown_field(value, 
FIELDS)),
                         }
                     }
@@ -5946,8 +5949,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                 let mut table_paths__ = None;
                 let mut output_schema__ = None;
                 let mut table_partition_cols__ = None;
-                let mut overwrite__ = None;
                 let mut keep_partition_by_columns__ = None;
+                let mut insert_op__ = None;
                 while let Some(k) = map_.next_key()? {
                     match k {
                         GeneratedField::ObjectStoreUrl => {
@@ -5980,18 +5983,18 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                             }
                             table_partition_cols__ = Some(map_.next_value()?);
                         }
-                        GeneratedField::Overwrite => {
-                            if overwrite__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("overwrite"));
-                            }
-                            overwrite__ = Some(map_.next_value()?);
-                        }
                         GeneratedField::KeepPartitionByColumns => {
                             if keep_partition_by_columns__.is_some() {
                                 return 
Err(serde::de::Error::duplicate_field("keepPartitionByColumns"));
                             }
                             keep_partition_by_columns__ = 
Some(map_.next_value()?);
                         }
+                        GeneratedField::InsertOp => {
+                            if insert_op__.is_some() {
+                                return 
Err(serde::de::Error::duplicate_field("insertOp"));
+                            }
+                            insert_op__ = Some(map_.next_value::<InsertOp>()? 
as i32);
+                        }
                     }
                 }
                 Ok(FileSinkConfig {
@@ -6000,8 +6003,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                     table_paths: table_paths__.unwrap_or_default(),
                     output_schema: output_schema__,
                     table_partition_cols: 
table_partition_cols__.unwrap_or_default(),
-                    overwrite: overwrite__.unwrap_or_default(),
                     keep_partition_by_columns: 
keep_partition_by_columns__.unwrap_or_default(),
+                    insert_op: insert_op__.unwrap_or_default(),
                 })
             }
         }
@@ -7198,6 +7201,80 @@ impl<'de> serde::Deserialize<'de> for InListNode {
         deserializer.deserialize_struct("datafusion.InListNode", FIELDS, 
GeneratedVisitor)
     }
 }
+impl serde::Serialize for InsertOp {
+    #[allow(deprecated)]
+    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
+    where
+        S: serde::Serializer,
+    {
+        let variant = match self {
+            Self::Append => "Append",
+            Self::Overwrite => "Overwrite",
+            Self::Replace => "Replace",
+        };
+        serializer.serialize_str(variant)
+    }
+}
+impl<'de> serde::Deserialize<'de> for InsertOp {
+    #[allow(deprecated)]
+    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+    where
+        D: serde::Deserializer<'de>,
+    {
+        const FIELDS: &[&str] = &[
+            "Append",
+            "Overwrite",
+            "Replace",
+        ];
+
+        struct GeneratedVisitor;
+
+        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+            type Value = InsertOp;
+
+            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
+                write!(formatter, "expected one of: {:?}", &FIELDS)
+            }
+
+            fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, 
E>
+            where
+                E: serde::de::Error,
+            {
+                i32::try_from(v)
+                    .ok()
+                    .and_then(|x| x.try_into().ok())
+                    .ok_or_else(|| {
+                        
serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self)
+                    })
+            }
+
+            fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, 
E>
+            where
+                E: serde::de::Error,
+            {
+                i32::try_from(v)
+                    .ok()
+                    .and_then(|x| x.try_into().ok())
+                    .ok_or_else(|| {
+                        
serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self)
+                    })
+            }
+
+            fn visit_str<E>(self, value: &str) -> 
std::result::Result<Self::Value, E>
+            where
+                E: serde::de::Error,
+            {
+                match value {
+                    "Append" => Ok(InsertOp::Append),
+                    "Overwrite" => Ok(InsertOp::Overwrite),
+                    "Replace" => Ok(InsertOp::Replace),
+                    _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
+                }
+            }
+        }
+        deserializer.deserialize_any(GeneratedVisitor)
+    }
+}
 impl serde::Serialize for InterleaveExecNode {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 21d88e565e..436347330d 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1067,10 +1067,10 @@ pub struct FileSinkConfig {
     pub output_schema: 
::core::option::Option<super::datafusion_common::Schema>,
     #[prost(message, repeated, tag = "5")]
     pub table_partition_cols: ::prost::alloc::vec::Vec<PartitionColumn>,
-    #[prost(bool, tag = "8")]
-    pub overwrite: bool,
     #[prost(bool, tag = "9")]
     pub keep_partition_by_columns: bool,
+    #[prost(enumeration = "InsertOp", tag = "10")]
+    pub insert_op: i32,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct JsonSink {
@@ -1954,6 +1954,35 @@ impl DateUnit {
 }
 #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, 
::prost::Enumeration)]
 #[repr(i32)]
+pub enum InsertOp {
+    Append = 0,
+    Overwrite = 1,
+    Replace = 2,
+}
+impl InsertOp {
+    /// String value of the enum field names used in the ProtoBuf definition.
+    ///
+    /// The values are not transformed in any way and thus are considered 
stable
+    /// (if the ProtoBuf definition does not change) and safe for programmatic 
use.
+    pub fn as_str_name(&self) -> &'static str {
+        match self {
+            Self::Append => "Append",
+            Self::Overwrite => "Overwrite",
+            Self::Replace => "Replace",
+        }
+    }
+    /// Creates an enum from field names used in the ProtoBuf definition.
+    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
+        match value {
+            "Append" => Some(Self::Append),
+            "Overwrite" => Some(Self::Overwrite),
+            "Replace" => Some(Self::Replace),
+            _ => None,
+        }
+    }
+}
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, 
::prost::Enumeration)]
+#[repr(i32)]
 pub enum PartitionMode {
     CollectLeft = 0,
     Partitioned = 1,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index b2f92f4b2e..20ec5eeaea 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -21,6 +21,7 @@ use std::sync::Arc;
 
 use arrow::compute::SortOptions;
 use chrono::{TimeZone, Utc};
+use datafusion_expr::dml::InsertOp;
 use object_store::path::Path;
 use object_store::ObjectMeta;
 
@@ -640,13 +641,18 @@ impl TryFrom<&protobuf::FileSinkConfig> for 
FileSinkConfig {
                 Ok((name.clone(), data_type))
             })
             .collect::<Result<Vec<_>>>()?;
+        let insert_op = match conf.insert_op() {
+            protobuf::InsertOp::Append => InsertOp::Append,
+            protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
+            protobuf::InsertOp::Replace => InsertOp::Replace,
+        };
         Ok(Self {
             object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
             file_groups,
             table_paths,
             output_schema: Arc::new(convert_required!(conf.output_schema)?),
             table_partition_cols,
-            overwrite: conf.overwrite,
+            insert_op,
             keep_partition_by_columns: conf.keep_partition_by_columns,
         })
     }
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 6981c77228..6f6065a1c2 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -642,8 +642,8 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
             table_paths,
             output_schema: Some(conf.output_schema.as_ref().try_into()?),
             table_partition_cols,
-            overwrite: conf.overwrite,
             keep_partition_by_columns: conf.keep_partition_by_columns,
+            insert_op: conf.insert_op as i32,
         })
     }
 }
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index db84a08e5b..025676f790 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -27,6 +27,7 @@ use arrow::csv::WriterBuilder;
 use arrow::datatypes::{Fields, TimeUnit};
 use datafusion::physical_expr::aggregate::AggregateExprBuilder;
 use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_expr::dml::InsertOp;
 use 
datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf;
 use datafusion_functions_aggregate::array_agg::array_agg_udaf;
 use datafusion_functions_aggregate::min_max::max_udaf;
@@ -1143,7 +1144,7 @@ fn roundtrip_json_sink() -> Result<()> {
         table_paths: vec![ListingTableUrl::parse("file:///")?],
         output_schema: schema.clone(),
         table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
-        overwrite: true,
+        insert_op: InsertOp::Overwrite,
         keep_partition_by_columns: true,
     };
     let data_sink = Arc::new(JsonSink::new(
@@ -1179,7 +1180,7 @@ fn roundtrip_csv_sink() -> Result<()> {
         table_paths: vec![ListingTableUrl::parse("file:///")?],
         output_schema: schema.clone(),
         table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
-        overwrite: true,
+        insert_op: InsertOp::Overwrite,
         keep_partition_by_columns: true,
     };
     let data_sink = Arc::new(CsvSink::new(
@@ -1238,7 +1239,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
         table_paths: vec![ListingTableUrl::parse("file:///")?],
         output_schema: schema.clone(),
         table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
-        overwrite: true,
+        insert_op: InsertOp::Overwrite,
         keep_partition_by_columns: true,
     };
     let data_sink = Arc::new(ParquetSink::new(
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 29dfe25993..895285c597 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -37,7 +37,7 @@ use datafusion_common::{
     DataFusionError, Result, ScalarValue, SchemaError, SchemaReference, 
TableReference,
     ToDFSchema,
 };
-use datafusion_expr::dml::CopyTo;
+use datafusion_expr::dml::{CopyTo, InsertOp};
 use 
datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
 use datafusion_expr::logical_plan::builder::project;
 use datafusion_expr::logical_plan::DdlStatement;
@@ -53,7 +53,7 @@ use datafusion_expr::{
     TransactionConclusion, TransactionEnd, TransactionIsolationLevel, 
TransactionStart,
     Volatility, WriteOp,
 };
-use sqlparser::ast;
+use sqlparser::ast::{self, SqliteOnConflict};
 use sqlparser::ast::{
     Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
     CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable, 
Ident, Insert,
@@ -665,12 +665,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 returning,
                 ignore,
                 table_alias,
-                replace_into,
+                mut replace_into,
                 priority,
                 insert_alias,
             }) => {
-                if or.is_some() {
-                    plan_err!("Inserts with or clauses not supported")?;
+                if let Some(or) = or {
+                    match or {
+                        SqliteOnConflict::Replace => replace_into = true,
+                        _ => plan_err!("Inserts with {or} clause is not 
supported")?,
+                    }
                 }
                 if partitioned.is_some() {
                     plan_err!("Partitioned inserts not yet supported")?;
@@ -698,9 +701,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                         "Inserts with a table alias not supported: 
{table_alias:?}"
                     )?
                 };
-                if replace_into {
-                    plan_err!("Inserts with a `REPLACE INTO` clause not 
supported")?
-                };
                 if let Some(priority) = priority {
                     plan_err!(
                         "Inserts with a `PRIORITY` clause not supported: 
{priority:?}"
@@ -710,7 +710,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     plan_err!("Inserts with an alias not supported")?;
                 }
                 let _ = into; // optional keyword doesn't change behavior
-                self.insert_to_plan(table_name, columns, source, overwrite)
+                self.insert_to_plan(table_name, columns, source, overwrite, 
replace_into)
             }
             Statement::Update {
                 table,
@@ -1605,6 +1605,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         columns: Vec<Ident>,
         source: Box<Query>,
         overwrite: bool,
+        replace_into: bool,
     ) -> Result<LogicalPlan> {
         // Do a table lookup to verify the table exists
         let table_name = self.object_name_to_table_reference(table_name)?;
@@ -1707,16 +1708,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             .collect::<Result<Vec<Expr>>>()?;
         let source = project(source, exprs)?;
 
-        let op = if overwrite {
-            WriteOp::InsertOverwrite
-        } else {
-            WriteOp::InsertInto
+        let insert_op = match (overwrite, replace_into) {
+            (false, false) => InsertOp::Append,
+            (true, false) => InsertOp::Overwrite,
+            (false, true) => InsertOp::Replace,
+            (true, true) => plan_err!("Conflicting insert operations: 
`overwrite` and `replace_into` cannot both be true")?,
         };
 
         let plan = LogicalPlan::Dml(DmlStatement::new(
             table_name,
             Arc::new(table_schema),
-            op,
+            WriteOp::Insert(insert_op),
             Arc::new(source),
         ));
         Ok(plan)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to