This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3892499b28 Support REPLACE INTO for INSERT statements (#12516)
3892499b28 is described below
commit 3892499b28135f9862ffac3e4f19524bb5b77c59
Author: Fredrik Meringdal <[email protected]>
AuthorDate: Sat Sep 28 18:08:37 2024 +0200
Support REPLACE INTO for INSERT statements (#12516)
* Add support for REPLACE INTO statements.
This commit introduces an `InsertOp` enum to replace the boolean
`overwrite` flag to provide a more clear and flexible control over how
data is inserted. This change updates the following APIs and configs to
reflect the change: `TableProvider::insert_into`, `FileSinkConfig` and
`DataFrameWriteOptions`.
* fix clippy and add license
* Update vendored code
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/catalog/src/table.rs | 3 +-
datafusion/core/src/dataframe/mod.rs | 35 ++--
datafusion/core/src/dataframe/parquet.rs | 10 +-
.../core/src/datasource/file_format/arrow.rs | 3 +-
datafusion/core/src/datasource/file_format/csv.rs | 3 +-
datafusion/core/src/datasource/file_format/json.rs | 3 +-
.../core/src/datasource/file_format/parquet.rs | 9 +-
datafusion/core/src/datasource/listing/table.rs | 8 +-
datafusion/core/src/datasource/memory.rs | 10 +-
.../core/src/datasource/physical_plan/mod.rs | 6 +-
datafusion/core/src/datasource/stream.rs | 3 +-
datafusion/core/src/physical_planner.rs | 24 +--
.../core/tests/user_defined/insert_operation.rs | 188 +++++++++++++++++++++
datafusion/core/tests/user_defined/mod.rs | 3 +
datafusion/expr/src/logical_plan/builder.rs | 11 +-
datafusion/expr/src/logical_plan/dml.rs | 37 +++-
datafusion/proto/proto/datafusion.proto | 9 +-
datafusion/proto/src/generated/pbjson.rs | 109 ++++++++++--
datafusion/proto/src/generated/prost.rs | 33 +++-
datafusion/proto/src/physical_plan/from_proto.rs | 8 +-
datafusion/proto/src/physical_plan/to_proto.rs | 2 +-
.../proto/tests/cases/roundtrip_physical_plan.rs | 7 +-
datafusion/sql/src/statement.rs | 30 ++--
23 files changed, 447 insertions(+), 107 deletions(-)
diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs
index 6c36d907ac..ca3a2bef88 100644
--- a/datafusion/catalog/src/table.rs
+++ b/datafusion/catalog/src/table.rs
@@ -25,6 +25,7 @@ use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_common::{not_impl_err, Constraints, Statistics};
+use datafusion_expr::dml::InsertOp;
use datafusion_expr::{
CreateExternalTable, Expr, LogicalPlan, TableProviderFilterPushDown,
TableType,
};
@@ -274,7 +275,7 @@ pub trait TableProvider: Debug + Sync + Send {
&self,
_state: &dyn Session,
_input: Arc<dyn ExecutionPlan>,
- _overwrite: bool,
+ _insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Insert into not implemented for this table")
}
diff --git a/datafusion/core/src/dataframe/mod.rs
b/datafusion/core/src/dataframe/mod.rs
index 72b763ce0f..70c5075114 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -52,6 +52,7 @@ use datafusion_common::config::{CsvOptions, JsonOptions};
use datafusion_common::{
plan_err, Column, DFSchema, DataFusionError, ParamValues, SchemaError,
UnnestOptions,
};
+use datafusion_expr::dml::InsertOp;
use datafusion_expr::{case, is_null, lit, SortExpr};
use datafusion_expr::{
utils::COUNT_STAR_EXPANSION, TableProviderFilterPushDown, UNNAMED_TABLE,
@@ -66,8 +67,9 @@ use datafusion_catalog::Session;
/// Contains options that control how data is
/// written out from a DataFrame
pub struct DataFrameWriteOptions {
- /// Controls if existing data should be overwritten
- overwrite: bool,
+ /// Controls how new data should be written to the table, determining
whether
+ /// to append, overwrite, or replace existing data.
+ insert_op: InsertOp,
/// Controls if all partitions should be coalesced into a single output
file
/// Generally will have slower performance when set to true.
single_file_output: bool,
@@ -80,14 +82,15 @@ impl DataFrameWriteOptions {
/// Create a new DataFrameWriteOptions with default values
pub fn new() -> Self {
DataFrameWriteOptions {
- overwrite: false,
+ insert_op: InsertOp::Append,
single_file_output: false,
partition_by: vec![],
}
}
- /// Set the overwrite option to true or false
- pub fn with_overwrite(mut self, overwrite: bool) -> Self {
- self.overwrite = overwrite;
+
+ /// Set the insert operation
+ pub fn with_insert_operation(mut self, insert_op: InsertOp) -> Self {
+ self.insert_op = insert_op;
self
}
@@ -1525,7 +1528,7 @@ impl DataFrame {
self.plan,
table_name.to_owned(),
&arrow_schema,
- write_options.overwrite,
+ write_options.insert_op,
)?
.build()?;
@@ -1566,10 +1569,11 @@ impl DataFrame {
options: DataFrameWriteOptions,
writer_options: Option<CsvOptions>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
- if options.overwrite {
- return Err(DataFusionError::NotImplemented(
- "Overwrites are not implemented for
DataFrame::write_csv.".to_owned(),
- ));
+ if options.insert_op != InsertOp::Append {
+ return Err(DataFusionError::NotImplemented(format!(
+ "{} is not implemented for DataFrame::write_csv.",
+ options.insert_op
+ )));
}
let format = if let Some(csv_opts) = writer_options {
@@ -1626,10 +1630,11 @@ impl DataFrame {
options: DataFrameWriteOptions,
writer_options: Option<JsonOptions>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
- if options.overwrite {
- return Err(DataFusionError::NotImplemented(
- "Overwrites are not implemented for
DataFrame::write_json.".to_owned(),
- ));
+ if options.insert_op != InsertOp::Append {
+ return Err(DataFusionError::NotImplemented(format!(
+ "{} is not implemented for DataFrame::write_json.",
+ options.insert_op
+ )));
}
let format = if let Some(json_opts) = writer_options {
diff --git a/datafusion/core/src/dataframe/parquet.rs
b/datafusion/core/src/dataframe/parquet.rs
index 66974e37f4..f90b35fde6 100644
--- a/datafusion/core/src/dataframe/parquet.rs
+++ b/datafusion/core/src/dataframe/parquet.rs
@@ -26,6 +26,7 @@ use super::{
};
use datafusion_common::config::TableParquetOptions;
+use datafusion_expr::dml::InsertOp;
impl DataFrame {
/// Execute the `DataFrame` and write the results to Parquet file(s).
@@ -57,10 +58,11 @@ impl DataFrame {
options: DataFrameWriteOptions,
writer_options: Option<TableParquetOptions>,
) -> Result<Vec<RecordBatch>, DataFusionError> {
- if options.overwrite {
- return Err(DataFusionError::NotImplemented(
- "Overwrites are not implemented for
DataFrame::write_parquet.".to_owned(),
- ));
+ if options.insert_op != InsertOp::Append {
+ return Err(DataFusionError::NotImplemented(format!(
+ "{} is not implemented for DataFrame::write_parquet.",
+ options.insert_op
+ )));
}
let format = if let Some(parquet_opts) = writer_options {
diff --git a/datafusion/core/src/datasource/file_format/arrow.rs
b/datafusion/core/src/datasource/file_format/arrow.rs
index 6ee4280956..c10ebbd6c9 100644
--- a/datafusion/core/src/datasource/file_format/arrow.rs
+++ b/datafusion/core/src/datasource/file_format/arrow.rs
@@ -47,6 +47,7 @@ use datafusion_common::{
not_impl_err, DataFusionError, GetExt, Statistics, DEFAULT_ARROW_EXTENSION,
};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
@@ -181,7 +182,7 @@ impl FileFormat for ArrowFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if conf.overwrite {
+ if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for Arrow
format");
}
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 99e8f13776..e821fa806f 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -46,6 +46,7 @@ use datafusion_common::{
exec_err, not_impl_err, DataFusionError, GetExt, DEFAULT_CSV_EXTENSION,
};
use datafusion_execution::TaskContext;
+use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;
@@ -382,7 +383,7 @@ impl FileFormat for CsvFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if conf.overwrite {
+ if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for CSV");
}
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 4471d7d6cb..c9ed0c0d28 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -46,6 +46,7 @@ use datafusion_common::config::{ConfigField, ConfigFileType,
JsonOptions};
use datafusion_common::file_options::json_writer::JsonWriterOptions;
use datafusion_common::{not_impl_err, GetExt, DEFAULT_JSON_EXTENSION};
use datafusion_execution::TaskContext;
+use datafusion_expr::dml::InsertOp;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::MetricsSet;
use datafusion_physical_plan::ExecutionPlan;
@@ -252,7 +253,7 @@ impl FileFormat for JsonFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if conf.overwrite {
+ if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for
Json");
}
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 35296b0d79..98ae0ce14b 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -53,6 +53,7 @@ use datafusion_common::{
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool,
MemoryReservation};
use datafusion_execution::TaskContext;
+use datafusion_expr::dml::InsertOp;
use datafusion_expr::Expr;
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
use datafusion_physical_expr::PhysicalExpr;
@@ -403,7 +404,7 @@ impl FileFormat for ParquetFormat {
conf: FileSinkConfig,
order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
- if conf.overwrite {
+ if conf.insert_op != InsertOp::Append {
return not_impl_err!("Overwrites are not implemented yet for
Parquet");
}
@@ -2269,7 +2270,7 @@ mod tests {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![],
- overwrite: true,
+ insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
@@ -2364,7 +2365,7 @@ mod tests {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("a".to_string(), DataType::Utf8)], //
add partitioning
- overwrite: true,
+ insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
@@ -2447,7 +2448,7 @@ mod tests {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![],
- overwrite: true,
+ insert_op: InsertOp::Overwrite,
keep_partition_by_columns: false,
};
let parquet_sink = Arc::new(ParquetSink::new(
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 2a35fddeb0..3eb8eed9de 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -34,6 +34,7 @@ use crate::datasource::{
use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{DataFusionError, Result};
+use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics};
@@ -916,7 +917,7 @@ impl TableProvider for ListingTable {
&self,
state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
- overwrite: bool,
+ insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
// Check that the schema of the plan matches the schema of this table.
if !self
@@ -975,7 +976,7 @@ impl TableProvider for ListingTable {
file_groups,
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
- overwrite,
+ insert_op,
keep_partition_by_columns,
};
@@ -1990,7 +1991,8 @@ mod tests {
// Therefore, we will have 8 partitions in the final plan.
// Create an insert plan to insert the source data into the initial
table
let insert_into_table =
- LogicalPlanBuilder::insert_into(scan_plan, "t", &schema,
false)?.build()?;
+ LogicalPlanBuilder::insert_into(scan_plan, "t", &schema,
InsertOp::Append)?
+ .build()?;
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory.rs
index 70f3c36b81..24a4938e7b 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -39,6 +39,7 @@ use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema,
SchemaExt};
use datafusion_execution::TaskContext;
+use datafusion_expr::dml::InsertOp;
use datafusion_physical_plan::metrics::MetricsSet;
use async_trait::async_trait;
@@ -262,7 +263,7 @@ impl TableProvider for MemTable {
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
- overwrite: bool,
+ insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
// If we are inserting into the table, any sort order may be messed up
so reset it here
*self.sort_order.lock() = vec![];
@@ -289,8 +290,8 @@ impl TableProvider for MemTable {
.collect::<Vec<_>>()
);
}
- if overwrite {
- return not_impl_err!("Overwrite not implemented for MemoryTable
yet");
+ if insert_op != InsertOp::Append {
+ return not_impl_err!("{insert_op} not implemented for MemoryTable
yet");
}
let sink = Arc::new(MemSink::new(self.batches.clone()));
Ok(Arc::new(DataSinkExec::new(
@@ -638,7 +639,8 @@ mod tests {
let scan_plan = LogicalPlanBuilder::scan("source", source,
None)?.build()?;
// Create an insert plan to insert the source data into the initial
table
let insert_into_table =
- LogicalPlanBuilder::insert_into(scan_plan, "t", &schema,
false)?.build()?;
+ LogicalPlanBuilder::insert_into(scan_plan, "t", &schema,
InsertOp::Append)?
+ .build()?;
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 4018b3bb29..6e8752ccfb 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -36,6 +36,7 @@ pub use self::parquet::{ParquetExec, ParquetFileMetrics,
ParquetFileReaderFactor
pub use arrow_file::ArrowExec;
pub use avro::AvroExec;
pub use csv::{CsvConfig, CsvExec, CsvExecBuilder, CsvOpener};
+use datafusion_expr::dml::InsertOp;
pub use file_groups::FileGroupPartitioner;
pub use file_scan_config::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
@@ -83,8 +84,9 @@ pub struct FileSinkConfig {
/// A vector of column names and their corresponding data types,
/// representing the partitioning columns for the file
pub table_partition_cols: Vec<(String, DataType)>,
- /// Controls whether existing data should be overwritten by this sink
- pub overwrite: bool,
+ /// Controls how new data should be written to the file, determining
whether
+ /// to append to, overwrite, or replace records in existing files.
+ pub insert_op: InsertOp,
/// Controls whether partition columns are kept for the file
pub keep_partition_by_columns: bool,
}
diff --git a/datafusion/core/src/datasource/stream.rs
b/datafusion/core/src/datasource/stream.rs
index d30247e2c6..34023fbbb6 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/core/src/datasource/stream.rs
@@ -33,6 +33,7 @@ use arrow_schema::SchemaRef;
use datafusion_common::{config_err, plan_err, Constraints, DataFusionError,
Result};
use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+use datafusion_expr::dml::InsertOp;
use datafusion_expr::{CreateExternalTable, Expr, SortExpr, TableType};
use datafusion_physical_plan::insert::{DataSink, DataSinkExec};
use datafusion_physical_plan::metrics::MetricsSet;
@@ -350,7 +351,7 @@ impl TableProvider for StreamTable {
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
- _overwrite: bool,
+ _insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
let ordering = match self.0.order.first() {
Some(x) => {
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index b2b912d8ad..520392c9f0 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -71,7 +71,7 @@ use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err,
DFSchema,
ScalarValue,
};
-use datafusion_expr::dml::CopyTo;
+use datafusion_expr::dml::{CopyTo, InsertOp};
use datafusion_expr::expr::{
physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
};
@@ -529,7 +529,7 @@ impl DefaultPhysicalPlanner {
file_groups: vec![],
output_schema: Arc::new(schema),
table_partition_cols,
- overwrite: false,
+ insert_op: InsertOp::Append,
keep_partition_by_columns,
};
@@ -542,7 +542,7 @@ impl DefaultPhysicalPlanner {
}
LogicalPlan::Dml(DmlStatement {
table_name,
- op: WriteOp::InsertInto,
+ op: WriteOp::Insert(insert_op),
..
}) => {
let name = table_name.table();
@@ -550,23 +550,7 @@ impl DefaultPhysicalPlanner {
if let Some(provider) = schema.table(name).await? {
let input_exec = children.one()?;
provider
- .insert_into(session_state, input_exec, false)
- .await?
- } else {
- return exec_err!("Table '{table_name}' does not exist");
- }
- }
- LogicalPlan::Dml(DmlStatement {
- table_name,
- op: WriteOp::InsertOverwrite,
- ..
- }) => {
- let name = table_name.table();
- let schema = session_state.schema_for_ref(table_name.clone())?;
- if let Some(provider) = schema.table(name).await? {
- let input_exec = children.one()?;
- provider
- .insert_into(session_state, input_exec, true)
+ .insert_into(session_state, input_exec, *insert_op)
.await?
} else {
return exec_err!("Table '{table_name}' does not exist");
diff --git a/datafusion/core/tests/user_defined/insert_operation.rs
b/datafusion/core/tests/user_defined/insert_operation.rs
new file mode 100644
index 0000000000..ff14fa0be3
--- /dev/null
+++ b/datafusion/core/tests/user_defined/insert_operation.rs
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, sync::Arc};
+
+use arrow_schema::{DataType, Field, Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion::{
+ error::Result,
+ prelude::{SessionConfig, SessionContext},
+};
+use datafusion_catalog::{Session, TableProvider};
+use datafusion_expr::{dml::InsertOp, Expr, TableType};
+use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
+use datafusion_physical_plan::{DisplayAs, ExecutionMode, ExecutionPlan,
PlanProperties};
+
+#[tokio::test]
+async fn insert_operation_is_passed_correctly_to_table_provider() {
+ // Use the SQLite syntax so we can test the "INSERT OR REPLACE INTO" syntax
+ let ctx = session_ctx_with_dialect("SQLite");
+ let table_provider = Arc::new(TestInsertTableProvider::new());
+ ctx.register_table("testing", table_provider.clone())
+ .unwrap();
+
+ let sql = "INSERT INTO testing (column) VALUES (1)";
+ assert_insert_op(&ctx, sql, InsertOp::Append).await;
+
+ let sql = "INSERT OVERWRITE testing (column) VALUES (1)";
+ assert_insert_op(&ctx, sql, InsertOp::Overwrite).await;
+
+ let sql = "REPLACE INTO testing (column) VALUES (1)";
+ assert_insert_op(&ctx, sql, InsertOp::Replace).await;
+
+ let sql = "INSERT OR REPLACE INTO testing (column) VALUES (1)";
+ assert_insert_op(&ctx, sql, InsertOp::Replace).await;
+}
+
+async fn assert_insert_op(ctx: &SessionContext, sql: &str, insert_op:
InsertOp) {
+ let df = ctx.sql(sql).await.unwrap();
+ let plan = df.create_physical_plan().await.unwrap();
+ let exec = plan.as_any().downcast_ref::<TestInsertExec>().unwrap();
+ assert_eq!(exec.op, insert_op);
+}
+
+fn session_ctx_with_dialect(dialect: impl Into<String>) -> SessionContext {
+ let mut config = SessionConfig::new();
+ let options = config.options_mut();
+ options.sql_parser.dialect = dialect.into();
+ SessionContext::new_with_config(config)
+}
+
+#[derive(Debug)]
+struct TestInsertTableProvider {
+ schema: SchemaRef,
+}
+
+impl TestInsertTableProvider {
+ fn new() -> Self {
+ Self {
+ schema: SchemaRef::new(Schema::new(vec![Field::new(
+ "column",
+ DataType::Int64,
+ false,
+ )])),
+ }
+ }
+}
+
+#[async_trait]
+impl TableProvider for TestInsertTableProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ _projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ unimplemented!("TestInsertTableProvider is a stub for testing.")
+ }
+
+ async fn insert_into(
+ &self,
+ _state: &dyn Session,
+ _input: Arc<dyn ExecutionPlan>,
+ insert_op: InsertOp,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(TestInsertExec::new(insert_op)))
+ }
+}
+
+#[derive(Debug)]
+struct TestInsertExec {
+ op: InsertOp,
+ plan_properties: PlanProperties,
+}
+
+impl TestInsertExec {
+ fn new(op: InsertOp) -> Self {
+ let eq_properties = EquivalenceProperties::new(make_count_schema());
+ let plan_properties = PlanProperties::new(
+ eq_properties,
+ Partitioning::UnknownPartitioning(1),
+ ExecutionMode::Bounded,
+ );
+ Self {
+ op,
+ plan_properties,
+ }
+ }
+}
+
+impl DisplayAs for TestInsertExec {
+ fn fmt_as(
+ &self,
+ _t: datafusion_physical_plan::DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ write!(f, "TestInsertExec")
+ }
+}
+
+impl ExecutionPlan for TestInsertExec {
+ fn name(&self) -> &str {
+ "TestInsertExec"
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn properties(&self) -> &PlanProperties {
+ &self.plan_properties
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ assert!(children.is_empty());
+ Ok(self)
+ }
+
+ fn execute(
+ &self,
+ _partition: usize,
+ _context: Arc<datafusion_execution::TaskContext>,
+ ) -> Result<datafusion_execution::SendableRecordBatchStream> {
+ unimplemented!("TestInsertExec is a stub for testing.")
+ }
+}
+
+fn make_count_schema() -> SchemaRef {
+ Arc::new(Schema::new(vec![Field::new(
+ "count",
+ DataType::UInt64,
+ false,
+ )]))
+}
diff --git a/datafusion/core/tests/user_defined/mod.rs
b/datafusion/core/tests/user_defined/mod.rs
index 56cec8df46..5d84cdb692 100644
--- a/datafusion/core/tests/user_defined/mod.rs
+++ b/datafusion/core/tests/user_defined/mod.rs
@@ -32,3 +32,6 @@ mod user_defined_table_functions;
/// Tests for Expression Planner
mod expr_planner;
+
+/// Tests for insert operations
+mod insert_operation;
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index ad96f6a85d..cc8ddf8ec8 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -54,6 +54,7 @@ use datafusion_common::{
TableReference, ToDFSchema, UnnestOptions,
};
+use super::dml::InsertOp;
use super::plan::{ColumnUnnestList, ColumnUnnestType};
/// Default table name for unnamed table
@@ -307,20 +308,14 @@ impl LogicalPlanBuilder {
input: LogicalPlan,
table_name: impl Into<TableReference>,
table_schema: &Schema,
- overwrite: bool,
+ insert_op: InsertOp,
) -> Result<Self> {
let table_schema = table_schema.clone().to_dfschema_ref()?;
- let op = if overwrite {
- WriteOp::InsertOverwrite
- } else {
- WriteOp::InsertInto
- };
-
Ok(Self::new(LogicalPlan::Dml(DmlStatement::new(
table_name.into(),
table_schema,
- op,
+ WriteOp::Insert(insert_op),
Arc::new(input),
))))
}
diff --git a/datafusion/expr/src/logical_plan/dml.rs
b/datafusion/expr/src/logical_plan/dml.rs
index c2ed9dc078..68b3ac41fa 100644
--- a/datafusion/expr/src/logical_plan/dml.rs
+++ b/datafusion/expr/src/logical_plan/dml.rs
@@ -146,8 +146,7 @@ impl PartialOrd for DmlStatement {
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub enum WriteOp {
- InsertOverwrite,
- InsertInto,
+ Insert(InsertOp),
Delete,
Update,
Ctas,
@@ -157,8 +156,7 @@ impl WriteOp {
/// Return a descriptive name of this [`WriteOp`]
pub fn name(&self) -> &str {
match self {
- WriteOp::InsertOverwrite => "Insert Overwrite",
- WriteOp::InsertInto => "Insert Into",
+ WriteOp::Insert(insert) => insert.name(),
WriteOp::Delete => "Delete",
WriteOp::Update => "Update",
WriteOp::Ctas => "Ctas",
@@ -172,6 +170,37 @@ impl Display for WriteOp {
}
}
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
+pub enum InsertOp {
+ /// Appends new rows to the existing table without modifying any
+ /// existing rows. This corresponds to the SQL `INSERT INTO` query.
+ Append,
+ /// Overwrites all existing rows in the table with the new rows.
+ /// This corresponds to the SQL `INSERT OVERWRITE` query.
+ Overwrite,
+ /// If any existing rows collides with the inserted rows (typically based
+ /// on a unique key or primary key), those existing rows are replaced.
+ /// This corresponds to the SQL `REPLACE INTO` query and its equivalents.
+ Replace,
+}
+
+impl InsertOp {
+ /// Return a descriptive name of this [`InsertOp`]
+ pub fn name(&self) -> &str {
+ match self {
+ InsertOp::Append => "Insert Into",
+ InsertOp::Overwrite => "Insert Overwrite",
+ InsertOp::Replace => "Replace Into",
+ }
+ }
+}
+
+impl Display for InsertOp {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "{}", self.name())
+ }
+}
+
fn make_count_schema() -> DFSchemaRef {
Arc::new(
Schema::new(vec![Field::new("count", DataType::UInt64, false)])
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 1204c843fd..e36c91e7d0 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -731,14 +731,21 @@ message PartitionColumn {
message FileSinkConfig {
reserved 6; // writer_mode
+ reserved 8; // was `overwrite` which has been superseded by `insert_op`
string object_store_url = 1;
repeated PartitionedFile file_groups = 2;
repeated string table_paths = 3;
datafusion_common.Schema output_schema = 4;
repeated PartitionColumn table_partition_cols = 5;
- bool overwrite = 8;
bool keep_partition_by_columns = 9;
+ InsertOp insert_op = 10;
+}
+
+enum InsertOp {
+ Append = 0;
+ Overwrite = 1;
+ Replace = 2;
}
message JsonSink {
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 0614e33b7a..004798b3ba 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -5832,10 +5832,10 @@ impl serde::Serialize for FileSinkConfig {
if !self.table_partition_cols.is_empty() {
len += 1;
}
- if self.overwrite {
+ if self.keep_partition_by_columns {
len += 1;
}
- if self.keep_partition_by_columns {
+ if self.insert_op != 0 {
len += 1;
}
let mut struct_ser =
serializer.serialize_struct("datafusion.FileSinkConfig", len)?;
@@ -5854,12 +5854,14 @@ impl serde::Serialize for FileSinkConfig {
if !self.table_partition_cols.is_empty() {
struct_ser.serialize_field("tablePartitionCols",
&self.table_partition_cols)?;
}
- if self.overwrite {
- struct_ser.serialize_field("overwrite", &self.overwrite)?;
- }
if self.keep_partition_by_columns {
struct_ser.serialize_field("keepPartitionByColumns",
&self.keep_partition_by_columns)?;
}
+ if self.insert_op != 0 {
+ let v = InsertOp::try_from(self.insert_op)
+ .map_err(|_| serde::ser::Error::custom(format!("Invalid
variant {}", self.insert_op)))?;
+ struct_ser.serialize_field("insertOp", &v)?;
+ }
struct_ser.end()
}
}
@@ -5880,9 +5882,10 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
"outputSchema",
"table_partition_cols",
"tablePartitionCols",
- "overwrite",
"keep_partition_by_columns",
"keepPartitionByColumns",
+ "insert_op",
+ "insertOp",
];
#[allow(clippy::enum_variant_names)]
@@ -5892,8 +5895,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
TablePaths,
OutputSchema,
TablePartitionCols,
- Overwrite,
KeepPartitionByColumns,
+ InsertOp,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) ->
std::result::Result<GeneratedField, D::Error>
@@ -5920,8 +5923,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
"tablePaths" | "table_paths" =>
Ok(GeneratedField::TablePaths),
"outputSchema" | "output_schema" =>
Ok(GeneratedField::OutputSchema),
"tablePartitionCols" | "table_partition_cols" =>
Ok(GeneratedField::TablePartitionCols),
- "overwrite" => Ok(GeneratedField::Overwrite),
"keepPartitionByColumns" |
"keep_partition_by_columns" => Ok(GeneratedField::KeepPartitionByColumns),
+ "insertOp" | "insert_op" =>
Ok(GeneratedField::InsertOp),
_ => Err(serde::de::Error::unknown_field(value,
FIELDS)),
}
}
@@ -5946,8 +5949,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
let mut table_paths__ = None;
let mut output_schema__ = None;
let mut table_partition_cols__ = None;
- let mut overwrite__ = None;
let mut keep_partition_by_columns__ = None;
+ let mut insert_op__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::ObjectStoreUrl => {
@@ -5980,18 +5983,18 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
}
table_partition_cols__ = Some(map_.next_value()?);
}
- GeneratedField::Overwrite => {
- if overwrite__.is_some() {
- return
Err(serde::de::Error::duplicate_field("overwrite"));
- }
- overwrite__ = Some(map_.next_value()?);
- }
GeneratedField::KeepPartitionByColumns => {
if keep_partition_by_columns__.is_some() {
return
Err(serde::de::Error::duplicate_field("keepPartitionByColumns"));
}
keep_partition_by_columns__ =
Some(map_.next_value()?);
}
+ GeneratedField::InsertOp => {
+ if insert_op__.is_some() {
+ return
Err(serde::de::Error::duplicate_field("insertOp"));
+ }
+ insert_op__ = Some(map_.next_value::<InsertOp>()?
as i32);
+ }
}
}
Ok(FileSinkConfig {
@@ -6000,8 +6003,8 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
table_paths: table_paths__.unwrap_or_default(),
output_schema: output_schema__,
table_partition_cols:
table_partition_cols__.unwrap_or_default(),
- overwrite: overwrite__.unwrap_or_default(),
keep_partition_by_columns:
keep_partition_by_columns__.unwrap_or_default(),
+ insert_op: insert_op__.unwrap_or_default(),
})
}
}
@@ -7198,6 +7201,80 @@ impl<'de> serde::Deserialize<'de> for InListNode {
deserializer.deserialize_struct("datafusion.InListNode", FIELDS,
GeneratedVisitor)
}
}
+impl serde::Serialize for InsertOp {
+ #[allow(deprecated)]
+ fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
+ where
+ S: serde::Serializer,
+ {
+ let variant = match self {
+ Self::Append => "Append",
+ Self::Overwrite => "Overwrite",
+ Self::Replace => "Replace",
+ };
+ serializer.serialize_str(variant)
+ }
+}
+impl<'de> serde::Deserialize<'de> for InsertOp {
+ #[allow(deprecated)]
+ fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ const FIELDS: &[&str] = &[
+ "Append",
+ "Overwrite",
+ "Replace",
+ ];
+
+ struct GeneratedVisitor;
+
+ impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
+ type Value = InsertOp;
+
+ fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
+ write!(formatter, "expected one of: {:?}", &FIELDS)
+ }
+
+ fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value,
E>
+ where
+ E: serde::de::Error,
+ {
+ i32::try_from(v)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_else(|| {
+
serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self)
+ })
+ }
+
+ fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value,
E>
+ where
+ E: serde::de::Error,
+ {
+ i32::try_from(v)
+ .ok()
+ .and_then(|x| x.try_into().ok())
+ .ok_or_else(|| {
+
serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self)
+ })
+ }
+
+ fn visit_str<E>(self, value: &str) ->
std::result::Result<Self::Value, E>
+ where
+ E: serde::de::Error,
+ {
+ match value {
+ "Append" => Ok(InsertOp::Append),
+ "Overwrite" => Ok(InsertOp::Overwrite),
+ "Replace" => Ok(InsertOp::Replace),
+ _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
+ }
+ }
+ }
+ deserializer.deserialize_any(GeneratedVisitor)
+ }
+}
impl serde::Serialize for InterleaveExecNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 21d88e565e..436347330d 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1067,10 +1067,10 @@ pub struct FileSinkConfig {
pub output_schema:
::core::option::Option<super::datafusion_common::Schema>,
#[prost(message, repeated, tag = "5")]
pub table_partition_cols: ::prost::alloc::vec::Vec<PartitionColumn>,
- #[prost(bool, tag = "8")]
- pub overwrite: bool,
#[prost(bool, tag = "9")]
pub keep_partition_by_columns: bool,
+ #[prost(enumeration = "InsertOp", tag = "10")]
+ pub insert_op: i32,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct JsonSink {
@@ -1954,6 +1954,35 @@ impl DateUnit {
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord,
::prost::Enumeration)]
#[repr(i32)]
+pub enum InsertOp {
+ Append = 0,
+ Overwrite = 1,
+ Replace = 2,
+}
+impl InsertOp {
+ /// String value of the enum field names used in the ProtoBuf definition.
+ ///
+ /// The values are not transformed in any way and thus are considered
stable
+ /// (if the ProtoBuf definition does not change) and safe for programmatic
use.
+ pub fn as_str_name(&self) -> &'static str {
+ match self {
+ Self::Append => "Append",
+ Self::Overwrite => "Overwrite",
+ Self::Replace => "Replace",
+ }
+ }
+ /// Creates an enum from field names used in the ProtoBuf definition.
+ pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
+ match value {
+ "Append" => Some(Self::Append),
+ "Overwrite" => Some(Self::Overwrite),
+ "Replace" => Some(Self::Replace),
+ _ => None,
+ }
+ }
+}
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord,
::prost::Enumeration)]
+#[repr(i32)]
pub enum PartitionMode {
CollectLeft = 0,
Partitioned = 1,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index b2f92f4b2e..20ec5eeaea 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -21,6 +21,7 @@ use std::sync::Arc;
use arrow::compute::SortOptions;
use chrono::{TimeZone, Utc};
+use datafusion_expr::dml::InsertOp;
use object_store::path::Path;
use object_store::ObjectMeta;
@@ -640,13 +641,18 @@ impl TryFrom<&protobuf::FileSinkConfig> for
FileSinkConfig {
Ok((name.clone(), data_type))
})
.collect::<Result<Vec<_>>>()?;
+ let insert_op = match conf.insert_op() {
+ protobuf::InsertOp::Append => InsertOp::Append,
+ protobuf::InsertOp::Overwrite => InsertOp::Overwrite,
+ protobuf::InsertOp::Replace => InsertOp::Replace,
+ };
Ok(Self {
object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
file_groups,
table_paths,
output_schema: Arc::new(convert_required!(conf.output_schema)?),
table_partition_cols,
- overwrite: conf.overwrite,
+ insert_op,
keep_partition_by_columns: conf.keep_partition_by_columns,
})
}
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 6981c77228..6f6065a1c2 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -642,8 +642,8 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
table_paths,
output_schema: Some(conf.output_schema.as_ref().try_into()?),
table_partition_cols,
- overwrite: conf.overwrite,
keep_partition_by_columns: conf.keep_partition_by_columns,
+ insert_op: conf.insert_op as i32,
})
}
}
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index db84a08e5b..025676f790 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -27,6 +27,7 @@ use arrow::csv::WriterBuilder;
use arrow::datatypes::{Fields, TimeUnit};
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
+use datafusion_expr::dml::InsertOp;
use
datafusion_functions_aggregate::approx_percentile_cont::approx_percentile_cont_udaf;
use datafusion_functions_aggregate::array_agg::array_agg_udaf;
use datafusion_functions_aggregate::min_max::max_udaf;
@@ -1143,7 +1144,7 @@ fn roundtrip_json_sink() -> Result<()> {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
- overwrite: true,
+ insert_op: InsertOp::Overwrite,
keep_partition_by_columns: true,
};
let data_sink = Arc::new(JsonSink::new(
@@ -1179,7 +1180,7 @@ fn roundtrip_csv_sink() -> Result<()> {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
- overwrite: true,
+ insert_op: InsertOp::Overwrite,
keep_partition_by_columns: true,
};
let data_sink = Arc::new(CsvSink::new(
@@ -1238,7 +1239,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
- overwrite: true,
+ insert_op: InsertOp::Overwrite,
keep_partition_by_columns: true,
};
let data_sink = Arc::new(ParquetSink::new(
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index 29dfe25993..895285c597 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -37,7 +37,7 @@ use datafusion_common::{
DataFusionError, Result, ScalarValue, SchemaError, SchemaReference,
TableReference,
ToDFSchema,
};
-use datafusion_expr::dml::CopyTo;
+use datafusion_expr::dml::{CopyTo, InsertOp};
use
datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
use datafusion_expr::logical_plan::builder::project;
use datafusion_expr::logical_plan::DdlStatement;
@@ -53,7 +53,7 @@ use datafusion_expr::{
TransactionConclusion, TransactionEnd, TransactionIsolationLevel,
TransactionStart,
Volatility, WriteOp,
};
-use sqlparser::ast;
+use sqlparser::ast::{self, SqliteOnConflict};
use sqlparser::ast::{
Assignment, AssignmentTarget, ColumnDef, CreateIndex, CreateTable,
CreateTableOptions, Delete, DescribeAlias, Expr as SQLExpr, FromTable,
Ident, Insert,
@@ -665,12 +665,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
returning,
ignore,
table_alias,
- replace_into,
+ mut replace_into,
priority,
insert_alias,
}) => {
- if or.is_some() {
- plan_err!("Inserts with or clauses not supported")?;
+ if let Some(or) = or {
+ match or {
+ SqliteOnConflict::Replace => replace_into = true,
+ _ => plan_err!("Inserts with {or} clause is not
supported")?,
+ }
}
if partitioned.is_some() {
plan_err!("Partitioned inserts not yet supported")?;
@@ -698,9 +701,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
"Inserts with a table alias not supported:
{table_alias:?}"
)?
};
- if replace_into {
- plan_err!("Inserts with a `REPLACE INTO` clause not
supported")?
- };
if let Some(priority) = priority {
plan_err!(
"Inserts with a `PRIORITY` clause not supported:
{priority:?}"
@@ -710,7 +710,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan_err!("Inserts with an alias not supported")?;
}
let _ = into; // optional keyword doesn't change behavior
- self.insert_to_plan(table_name, columns, source, overwrite)
+ self.insert_to_plan(table_name, columns, source, overwrite,
replace_into)
}
Statement::Update {
table,
@@ -1605,6 +1605,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
columns: Vec<Ident>,
source: Box<Query>,
overwrite: bool,
+ replace_into: bool,
) -> Result<LogicalPlan> {
// Do a table lookup to verify the table exists
let table_name = self.object_name_to_table_reference(table_name)?;
@@ -1707,16 +1708,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.collect::<Result<Vec<Expr>>>()?;
let source = project(source, exprs)?;
- let op = if overwrite {
- WriteOp::InsertOverwrite
- } else {
- WriteOp::InsertInto
+ let insert_op = match (overwrite, replace_into) {
+ (false, false) => InsertOp::Append,
+ (true, false) => InsertOp::Overwrite,
+ (false, true) => InsertOp::Replace,
+ (true, true) => plan_err!("Conflicting insert operations:
`overwrite` and `replace_into` cannot both be true")?,
};
let plan = LogicalPlan::Dml(DmlStatement::new(
table_name,
Arc::new(table_schema),
- op,
+ WriteOp::Insert(insert_op),
Arc::new(source),
));
Ok(plan)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]