This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 325a3fbe76 Remove FileWriterMode and ListingTableInsertMode (#7994) 
(#8017)
325a3fbe76 is described below

commit 325a3fbe7623d3df0ab64867545c4d93a0c96015
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Nov 17 22:14:27 2023 +0000

    Remove FileWriterMode and ListingTableInsertMode (#7994) (#8017)
    
    * Remove FileWriterMode Support (#7994)
    
    * Don't ignore test
    
    * Error on insert to single file
    
    * Improve DisplayAs
---
 datafusion/core/src/datasource/file_format/csv.rs  |  74 +----
 datafusion/core/src/datasource/file_format/json.rs |  59 +---
 .../core/src/datasource/file_format/options.rs     |  32 +-
 .../core/src/datasource/file_format/parquet.rs     |  67 ++--
 .../core/src/datasource/file_format/write/mod.rs   | 204 ++----------
 .../datasource/file_format/write/orchestration.rs  | 111 +------
 datafusion/core/src/datasource/listing/mod.rs      |   4 +-
 datafusion/core/src/datasource/listing/table.rs    | 343 +--------------------
 datafusion/core/src/datasource/listing/url.rs      |   8 +-
 .../core/src/datasource/listing_table_factory.rs   |  24 +-
 .../core/src/datasource/physical_plan/mod.rs       |   7 +-
 datafusion/core/src/datasource/stream.rs           |  11 +-
 datafusion/core/src/physical_planner.rs            |   2 -
 datafusion/proto/proto/datafusion.proto            |   9 +-
 datafusion/proto/src/generated/pbjson.rs           |  94 ------
 datafusion/proto/src/generated/prost.rs            |  31 --
 datafusion/proto/src/physical_plan/from_proto.rs   |  12 -
 datafusion/proto/src/physical_plan/to_proto.rs     |  13 -
 .../proto/tests/cases/roundtrip_physical_plan.rs   |   2 -
 datafusion/sqllogictest/test_files/copy.slt        |   2 +-
 datafusion/sqllogictest/test_files/errors.slt      |   2 +-
 datafusion/sqllogictest/test_files/explain.slt     |   4 +-
 datafusion/sqllogictest/test_files/insert.slt      |   2 +-
 .../sqllogictest/test_files/insert_to_external.slt |  24 +-
 datafusion/sqllogictest/test_files/joins.slt       |   1 -
 datafusion/sqllogictest/test_files/options.slt     |   4 +-
 datafusion/sqllogictest/test_files/order.slt       |   2 +-
 datafusion/sqllogictest/test_files/predicates.slt  |   1 +
 .../sqllogictest/test_files/set_variable.slt       |   2 +-
 datafusion/sqllogictest/test_files/update.slt      |   2 +-
 30 files changed, 127 insertions(+), 1026 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index 5f2084bc80..684f416f77 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -34,10 +34,10 @@ use futures::stream::BoxStream;
 use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
 use object_store::{delimited::newline_delimited_stream, ObjectMeta, 
ObjectStore};
 
-use super::write::orchestration::{stateless_append_all, 
stateless_multipart_put};
+use super::write::orchestration::stateless_multipart_put;
 use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
+use crate::datasource::file_format::write::BatchSerializer;
 use crate::datasource::physical_plan::{
     CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
 };
@@ -465,11 +465,7 @@ impl DisplayAs for CsvSink {
     fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(
-                    f,
-                    "CsvSink(writer_mode={:?}, file_groups=",
-                    self.config.writer_mode
-                )?;
+                write!(f, "CsvSink(file_groups=",)?;
                 FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
                 write!(f, ")")
             }
@@ -481,55 +477,6 @@ impl CsvSink {
     fn new(config: FileSinkConfig) -> Self {
         Self { config }
     }
-
-    async fn append_all(
-        &self,
-        data: SendableRecordBatchStream,
-        context: &Arc<TaskContext>,
-    ) -> Result<u64> {
-        if !self.config.table_partition_cols.is_empty() {
-            return Err(DataFusionError::NotImplemented("Inserting in append 
mode to hive style partitioned tables is not supported".into()));
-        }
-        let writer_options = 
self.config.file_type_writer_options.try_into_csv()?;
-        let (builder, compression) =
-            (&writer_options.writer_options, &writer_options.compression);
-        let compression = FileCompressionType::from(*compression);
-
-        let object_store = context
-            .runtime_env()
-            .object_store(&self.config.object_store_url)?;
-        let file_groups = &self.config.file_groups;
-
-        let builder_clone = builder.clone();
-        let options_clone = writer_options.clone();
-        let get_serializer = move |file_size| {
-            let inner_clone = builder_clone.clone();
-            // In append mode, consider has_header flag only when file is 
empty (at the start).
-            // For other modes, use has_header flag as is.
-            let serializer: Box<dyn BatchSerializer> = Box::new(if file_size > 
0 {
-                CsvSerializer::new()
-                    .with_builder(inner_clone)
-                    .with_header(false)
-            } else {
-                CsvSerializer::new()
-                    .with_builder(inner_clone)
-                    .with_header(options_clone.writer_options.header())
-            });
-            serializer
-        };
-
-        stateless_append_all(
-            data,
-            context,
-            object_store,
-            file_groups,
-            self.config.unbounded_input,
-            compression,
-            Box::new(get_serializer),
-        )
-        .await
-    }
-
     async fn multipartput_all(
         &self,
         data: SendableRecordBatchStream,
@@ -577,19 +524,8 @@ impl DataSink for CsvSink {
         data: SendableRecordBatchStream,
         context: &Arc<TaskContext>,
     ) -> Result<u64> {
-        match self.config.writer_mode {
-            FileWriterMode::Append => {
-                let total_count = self.append_all(data, context).await?;
-                Ok(total_count)
-            }
-            FileWriterMode::PutMultipart => {
-                let total_count = self.multipartput_all(data, context).await?;
-                Ok(total_count)
-            }
-            FileWriterMode::Put => {
-                return not_impl_err!("FileWriterMode::Put is not supported 
yet!")
-            }
-        }
+        let total_count = self.multipartput_all(data, context).await?;
+        Ok(total_count)
     }
 }
 
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 8d62d0a858..9893a1db45 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -45,10 +45,10 @@ use crate::physical_plan::insert::FileSinkExec;
 use crate::physical_plan::SendableRecordBatchStream;
 use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
 
-use super::write::orchestration::{stateless_append_all, 
stateless_multipart_put};
+use super::write::orchestration::stateless_multipart_put;
 
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
+use crate::datasource::file_format::write::BatchSerializer;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
 use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
 use crate::error::Result;
@@ -245,11 +245,7 @@ impl DisplayAs for JsonSink {
     fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(
-                    f,
-                    "JsonSink(writer_mode={:?}, file_groups=",
-                    self.config.writer_mode
-                )?;
+                write!(f, "JsonSink(file_groups=",)?;
                 FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
                 write!(f, ")")
             }
@@ -268,40 +264,6 @@ impl JsonSink {
         &self.config
     }
 
-    async fn append_all(
-        &self,
-        data: SendableRecordBatchStream,
-        context: &Arc<TaskContext>,
-    ) -> Result<u64> {
-        if !self.config.table_partition_cols.is_empty() {
-            return Err(DataFusionError::NotImplemented("Inserting in append 
mode to hive style partitioned tables is not supported".into()));
-        }
-
-        let writer_options = 
self.config.file_type_writer_options.try_into_json()?;
-        let compression = &writer_options.compression;
-
-        let object_store = context
-            .runtime_env()
-            .object_store(&self.config.object_store_url)?;
-        let file_groups = &self.config.file_groups;
-
-        let get_serializer = move |_| {
-            let serializer: Box<dyn BatchSerializer> = 
Box::new(JsonSerializer::new());
-            serializer
-        };
-
-        stateless_append_all(
-            data,
-            context,
-            object_store,
-            file_groups,
-            self.config.unbounded_input,
-            (*compression).into(),
-            Box::new(get_serializer),
-        )
-        .await
-    }
-
     async fn multipartput_all(
         &self,
         data: SendableRecordBatchStream,
@@ -342,19 +304,8 @@ impl DataSink for JsonSink {
         data: SendableRecordBatchStream,
         context: &Arc<TaskContext>,
     ) -> Result<u64> {
-        match self.config.writer_mode {
-            FileWriterMode::Append => {
-                let total_count = self.append_all(data, context).await?;
-                Ok(total_count)
-            }
-            FileWriterMode::PutMultipart => {
-                let total_count = self.multipartput_all(data, context).await?;
-                Ok(total_count)
-            }
-            FileWriterMode::Put => {
-                return not_impl_err!("FileWriterMode::Put is not supported 
yet!")
-            }
-        }
+        let total_count = self.multipartput_all(data, context).await?;
+        Ok(total_count)
     }
 }
 
diff --git a/datafusion/core/src/datasource/file_format/options.rs 
b/datafusion/core/src/datasource/file_format/options.rs
index 41a70e6d2f..4c7557a4a9 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -28,7 +28,7 @@ use 
crate::datasource::file_format::file_compression_type::FileCompressionType;
 #[cfg(feature = "parquet")]
 use crate::datasource::file_format::parquet::ParquetFormat;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
-use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
+use crate::datasource::listing::ListingTableUrl;
 use crate::datasource::{
     file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat},
     listing::ListingOptions,
@@ -76,8 +76,6 @@ pub struct CsvReadOptions<'a> {
     pub infinite: bool,
     /// Indicates how the file is sorted
     pub file_sort_order: Vec<Vec<Expr>>,
-    /// Setting controls how inserts to this file should be handled
-    pub insert_mode: ListingTableInsertMode,
 }
 
 impl<'a> Default for CsvReadOptions<'a> {
@@ -101,7 +99,6 @@ impl<'a> CsvReadOptions<'a> {
             file_compression_type: FileCompressionType::UNCOMPRESSED,
             infinite: false,
             file_sort_order: vec![],
-            insert_mode: ListingTableInsertMode::AppendToFile,
         }
     }
 
@@ -184,12 +181,6 @@ impl<'a> CsvReadOptions<'a> {
         self.file_sort_order = file_sort_order;
         self
     }
-
-    /// Configure how insertions to this table should be handled
-    pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
-        self.insert_mode = insert_mode;
-        self
-    }
 }
 
 /// Options that control the reading of Parquet files.
@@ -219,8 +210,6 @@ pub struct ParquetReadOptions<'a> {
     pub schema: Option<&'a Schema>,
     /// Indicates how the file is sorted
     pub file_sort_order: Vec<Vec<Expr>>,
-    /// Setting controls how inserts to this file should be handled
-    pub insert_mode: ListingTableInsertMode,
 }
 
 impl<'a> Default for ParquetReadOptions<'a> {
@@ -232,7 +221,6 @@ impl<'a> Default for ParquetReadOptions<'a> {
             skip_metadata: None,
             schema: None,
             file_sort_order: vec![],
-            insert_mode: ListingTableInsertMode::AppendNewFiles,
         }
     }
 }
@@ -272,12 +260,6 @@ impl<'a> ParquetReadOptions<'a> {
         self.file_sort_order = file_sort_order;
         self
     }
-
-    /// Configure how insertions to this table should be handled
-    pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
-        self.insert_mode = insert_mode;
-        self
-    }
 }
 
 /// Options that control the reading of ARROW files.
@@ -403,8 +385,6 @@ pub struct NdJsonReadOptions<'a> {
     pub infinite: bool,
     /// Indicates how the file is sorted
     pub file_sort_order: Vec<Vec<Expr>>,
-    /// Setting controls how inserts to this file should be handled
-    pub insert_mode: ListingTableInsertMode,
 }
 
 impl<'a> Default for NdJsonReadOptions<'a> {
@@ -417,7 +397,6 @@ impl<'a> Default for NdJsonReadOptions<'a> {
             file_compression_type: FileCompressionType::UNCOMPRESSED,
             infinite: false,
             file_sort_order: vec![],
-            insert_mode: ListingTableInsertMode::AppendToFile,
         }
     }
 }
@@ -464,12 +443,6 @@ impl<'a> NdJsonReadOptions<'a> {
         self.file_sort_order = file_sort_order;
         self
     }
-
-    /// Configure how insertions to this table should be handled
-    pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
-        self.insert_mode = insert_mode;
-        self
-    }
 }
 
 #[async_trait]
@@ -528,7 +501,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
             .with_table_partition_cols(self.table_partition_cols.clone())
             .with_file_sort_order(self.file_sort_order.clone())
             .with_infinite_source(self.infinite)
-            .with_insert_mode(self.insert_mode.clone())
     }
 
     async fn get_resolved_schema(
@@ -555,7 +527,6 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
             .with_target_partitions(config.target_partitions())
             .with_table_partition_cols(self.table_partition_cols.clone())
             .with_file_sort_order(self.file_sort_order.clone())
-            .with_insert_mode(self.insert_mode.clone())
     }
 
     async fn get_resolved_schema(
@@ -582,7 +553,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
             .with_table_partition_cols(self.table_partition_cols.clone())
             .with_infinite_source(self.infinite)
             .with_file_sort_order(self.file_sort_order.clone())
-            .with_insert_mode(self.insert_mode.clone())
     }
 
     async fn get_resolved_schema(
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs 
b/datafusion/core/src/datasource/file_format/parquet.rs
index 2cba474e55..c4d05adfc6 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -40,11 +40,12 @@ use crate::datasource::statistics::{create_max_min_accs, 
get_col_stats};
 use arrow::datatypes::SchemaRef;
 use arrow::datatypes::{Fields, Schema};
 use bytes::{BufMut, BytesMut};
-use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError, 
FileType};
+use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
 use futures::{StreamExt, TryStreamExt};
 use hashbrown::HashMap;
+use object_store::path::Path;
 use object_store::{ObjectMeta, ObjectStore};
 use parquet::arrow::{
     arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
@@ -55,7 +56,7 @@ use parquet::file::properties::WriterProperties;
 use parquet::file::statistics::Statistics as ParquetStatistics;
 
 use super::write::demux::start_demuxer_task;
-use super::write::{create_writer, AbortableWrite, FileWriterMode};
+use super::write::{create_writer, AbortableWrite};
 use super::{FileFormat, FileScanConfig};
 use crate::arrow::array::{
     BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
@@ -64,7 +65,7 @@ use crate::arrow::datatypes::DataType;
 use crate::config::ConfigOptions;
 
 use crate::datasource::physical_plan::{
-    FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter,
+    FileGroupDisplay, FileSinkConfig, ParquetExec, SchemaAdapter,
 };
 use crate::error::Result;
 use crate::execution::context::SessionState;
@@ -596,11 +597,7 @@ impl DisplayAs for ParquetSink {
     fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
         match t {
             DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(
-                    f,
-                    "ParquetSink(writer_mode={:?}, file_groups=",
-                    self.config.writer_mode
-                )?;
+                write!(f, "ParquetSink(file_groups=",)?;
                 FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
                 write!(f, ")")
             }
@@ -642,36 +639,23 @@ impl ParquetSink {
     /// AsyncArrowWriters are used when individual parquet file serialization 
is not parallelized
     async fn create_async_arrow_writer(
         &self,
-        file_meta: FileMeta,
+        location: &Path,
         object_store: Arc<dyn ObjectStore>,
         parquet_props: WriterProperties,
     ) -> Result<
         AsyncArrowWriter<Box<dyn tokio::io::AsyncWrite + std::marker::Send + 
Unpin>>,
     > {
-        let object = &file_meta.object_meta;
-        match self.config.writer_mode {
-            FileWriterMode::Append => {
-                plan_err!(
-                    "Appending to Parquet files is not supported by the file 
format!"
-                )
-            }
-            FileWriterMode::Put => {
-                not_impl_err!("FileWriterMode::Put is not implemented for 
ParquetSink")
-            }
-            FileWriterMode::PutMultipart => {
-                let (_, multipart_writer) = object_store
-                    .put_multipart(&object.location)
-                    .await
-                    .map_err(DataFusionError::ObjectStore)?;
-                let writer = AsyncArrowWriter::try_new(
-                    multipart_writer,
-                    self.get_writer_schema(),
-                    10485760,
-                    Some(parquet_props),
-                )?;
-                Ok(writer)
-            }
-        }
+        let (_, multipart_writer) = object_store
+            .put_multipart(location)
+            .await
+            .map_err(DataFusionError::ObjectStore)?;
+        let writer = AsyncArrowWriter::try_new(
+            multipart_writer,
+            self.get_writer_schema(),
+            10485760,
+            Some(parquet_props),
+        )?;
+        Ok(writer)
     }
 }
 
@@ -730,13 +714,7 @@ impl DataSink for ParquetSink {
             if !allow_single_file_parallelism {
                 let mut writer = self
                     .create_async_arrow_writer(
-                        ObjectMeta {
-                            location: path,
-                            last_modified: chrono::offset::Utc::now(),
-                            size: 0,
-                            e_tag: None,
-                        }
-                        .into(),
+                        &path,
                         object_store.clone(),
                         parquet_props.clone(),
                     )
@@ -752,17 +730,10 @@ impl DataSink for ParquetSink {
                 });
             } else {
                 let writer = create_writer(
-                    FileWriterMode::PutMultipart,
                     // Parquet files as a whole are never compressed, since 
they
                     // manage compressed blocks themselves.
                     FileCompressionType::UNCOMPRESSED,
-                    ObjectMeta {
-                        location: path,
-                        last_modified: chrono::offset::Utc::now(),
-                        size: 0,
-                        e_tag: None,
-                    }
-                    .into(),
+                    &path,
                     object_store.clone(),
                 )
                 .await?;
diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs 
b/datafusion/core/src/datasource/file_format/write/mod.rs
index 770c7a49c3..cfcdbd8c46 100644
--- a/datafusion/core/src/datasource/file_format/write/mod.rs
+++ b/datafusion/core/src/datasource/file_format/write/mod.rs
@@ -19,128 +19,32 @@
 //! write support for the various file formats
 
 use std::io::Error;
-use std::mem;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
 
-use crate::datasource::physical_plan::FileMeta;
 use crate::error::Result;
 
 use arrow_array::RecordBatch;
 
-use datafusion_common::{exec_err, DataFusionError};
+use datafusion_common::DataFusionError;
 
 use async_trait::async_trait;
 use bytes::Bytes;
 
 use futures::future::BoxFuture;
-use futures::ready;
-use futures::FutureExt;
 use object_store::path::Path;
-use object_store::{MultipartId, ObjectMeta, ObjectStore};
+use object_store::{MultipartId, ObjectStore};
 
 use tokio::io::AsyncWrite;
 
 pub(crate) mod demux;
 pub(crate) mod orchestration;
 
-/// `AsyncPutWriter` is an object that facilitates asynchronous writing to 
object stores.
-/// It is specifically designed for the `object_store` crate's `put` method 
and sends
-/// whole bytes at once when the buffer is flushed.
-pub struct AsyncPutWriter {
-    /// Object metadata
-    object_meta: ObjectMeta,
-    /// A shared reference to the object store
-    store: Arc<dyn ObjectStore>,
-    /// A buffer that stores the bytes to be sent
-    current_buffer: Vec<u8>,
-    /// Used for async handling in flush method
-    inner_state: AsyncPutState,
-}
-
-impl AsyncPutWriter {
-    /// Constructor for the `AsyncPutWriter` object
-    pub fn new(object_meta: ObjectMeta, store: Arc<dyn ObjectStore>) -> Self {
-        Self {
-            object_meta,
-            store,
-            current_buffer: vec![],
-            // The writer starts out in buffering mode
-            inner_state: AsyncPutState::Buffer,
-        }
-    }
-
-    /// Separate implementation function that unpins the [`AsyncPutWriter`] so
-    /// that partial borrows work correctly
-    fn poll_shutdown_inner(
-        &mut self,
-        cx: &mut Context<'_>,
-    ) -> Poll<std::result::Result<(), Error>> {
-        loop {
-            match &mut self.inner_state {
-                AsyncPutState::Buffer => {
-                    // Convert the current buffer to bytes and take ownership 
of it
-                    let bytes = Bytes::from(mem::take(&mut 
self.current_buffer));
-                    // Set the inner state to Put variant with the bytes
-                    self.inner_state = AsyncPutState::Put { bytes }
-                }
-                AsyncPutState::Put { bytes } => {
-                    // Send the bytes to the object store's put method
-                    return Poll::Ready(
-                        ready!(self
-                            .store
-                            .put(&self.object_meta.location, bytes.clone())
-                            .poll_unpin(cx))
-                        .map_err(Error::from),
-                    );
-                }
-            }
-        }
-    }
-}
-
-/// An enum that represents the inner state of AsyncPut
-enum AsyncPutState {
-    /// Building Bytes struct in this state
-    Buffer,
-    /// Data in the buffer is being sent to the object store
-    Put { bytes: Bytes },
-}
-
-impl AsyncWrite for AsyncPutWriter {
-    // Define the implementation of the AsyncWrite trait for the 
`AsyncPutWriter` struct
-    fn poll_write(
-        mut self: Pin<&mut Self>,
-        _: &mut Context<'_>,
-        buf: &[u8],
-    ) -> Poll<std::result::Result<usize, Error>> {
-        // Extend the current buffer with the incoming buffer
-        self.current_buffer.extend_from_slice(buf);
-        // Return a ready poll with the length of the incoming buffer
-        Poll::Ready(Ok(buf.len()))
-    }
-
-    fn poll_flush(
-        self: Pin<&mut Self>,
-        _: &mut Context<'_>,
-    ) -> Poll<std::result::Result<(), Error>> {
-        // Return a ready poll with an empty result
-        Poll::Ready(Ok(()))
-    }
-
-    fn poll_shutdown(
-        mut self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<std::result::Result<(), Error>> {
-        // Call the poll_shutdown_inner method to handle the actual sending of 
data to the object store
-        self.poll_shutdown_inner(cx)
-    }
-}
-
 /// Stores data needed during abortion of MultiPart writers
+#[derive(Clone)]
 pub(crate) struct MultiPart {
     /// A shared reference to the object store
     store: Arc<dyn ObjectStore>,
@@ -163,45 +67,28 @@ impl MultiPart {
     }
 }
 
-pub(crate) enum AbortMode {
-    Put,
-    Append,
-    MultiPart(MultiPart),
-}
-
 /// A wrapper struct with abort method and writer
 pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
     writer: W,
-    mode: AbortMode,
+    multipart: MultiPart,
 }
 
 impl<W: AsyncWrite + Unpin + Send> AbortableWrite<W> {
     /// Create a new `AbortableWrite` instance with the given writer, and 
write mode.
-    pub(crate) fn new(writer: W, mode: AbortMode) -> Self {
-        Self { writer, mode }
+    pub(crate) fn new(writer: W, multipart: MultiPart) -> Self {
+        Self { writer, multipart }
     }
 
     /// handling of abort for different write modes
     pub(crate) fn abort_writer(&self) -> Result<BoxFuture<'static, 
Result<()>>> {
-        match &self.mode {
-            AbortMode::Put => Ok(async { Ok(()) }.boxed()),
-            AbortMode::Append => exec_err!("Cannot abort in append mode"),
-            AbortMode::MultiPart(MultiPart {
-                store,
-                multipart_id,
-                location,
-            }) => {
-                let location = location.clone();
-                let multipart_id = multipart_id.clone();
-                let store = store.clone();
-                Ok(Box::pin(async move {
-                    store
-                        .abort_multipart(&location, &multipart_id)
-                        .await
-                        .map_err(DataFusionError::ObjectStore)
-                }))
-            }
-        }
+        let multi = self.multipart.clone();
+        Ok(Box::pin(async move {
+            multi
+                .store
+                .abort_multipart(&multi.location, &multi.multipart_id)
+                .await
+                .map_err(DataFusionError::ObjectStore)
+        }))
     }
 }
 
@@ -229,16 +116,6 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for 
AbortableWrite<W> {
     }
 }
 
-/// An enum that defines different file writer modes.
-#[derive(Debug, Clone, Copy)]
-pub enum FileWriterMode {
-    /// Data is appended to an existing file.
-    Append,
-    /// Data is written to a new file.
-    Put,
-    /// Data is written to a new file in multiple parts.
-    PutMultipart,
-}
 /// A trait that defines the methods required for a RecordBatch serializer.
 #[async_trait]
 pub trait BatchSerializer: Unpin + Send {
@@ -255,51 +132,16 @@ pub trait BatchSerializer: Unpin + Send {
 /// Returns an [`AbortableWrite`] which writes to the given object store 
location
 /// with the specified compression
 pub(crate) async fn create_writer(
-    writer_mode: FileWriterMode,
     file_compression_type: FileCompressionType,
-    file_meta: FileMeta,
+    location: &Path,
     object_store: Arc<dyn ObjectStore>,
 ) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
-    let object = &file_meta.object_meta;
-    match writer_mode {
-        // If the mode is append, call the store's append method and return 
wrapped in
-        // a boxed trait object.
-        FileWriterMode::Append => {
-            let writer = object_store
-                .append(&object.location)
-                .await
-                .map_err(DataFusionError::ObjectStore)?;
-            let writer = AbortableWrite::new(
-                file_compression_type.convert_async_writer(writer)?,
-                AbortMode::Append,
-            );
-            Ok(writer)
-        }
-        // If the mode is put, create a new AsyncPut writer and return it 
wrapped in
-        // a boxed trait object
-        FileWriterMode::Put => {
-            let writer = Box::new(AsyncPutWriter::new(object.clone(), 
object_store));
-            let writer = AbortableWrite::new(
-                file_compression_type.convert_async_writer(writer)?,
-                AbortMode::Put,
-            );
-            Ok(writer)
-        }
-        // If the mode is put multipart, call the store's put_multipart method 
and
-        // return the writer wrapped in a boxed trait object.
-        FileWriterMode::PutMultipart => {
-            let (multipart_id, writer) = object_store
-                .put_multipart(&object.location)
-                .await
-                .map_err(DataFusionError::ObjectStore)?;
-            Ok(AbortableWrite::new(
-                file_compression_type.convert_async_writer(writer)?,
-                AbortMode::MultiPart(MultiPart::new(
-                    object_store,
-                    multipart_id,
-                    object.location.clone(),
-                )),
-            ))
-        }
-    }
+    let (multipart_id, writer) = object_store
+        .put_multipart(location)
+        .await
+        .map_err(DataFusionError::ObjectStore)?;
+    Ok(AbortableWrite::new(
+        file_compression_type.convert_async_writer(writer)?,
+        MultiPart::new(object_store, multipart_id, location.clone()),
+    ))
 }
diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs 
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index f84baa9ac2..2ae6b70ed1 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -22,7 +22,6 @@
 use std::sync::Arc;
 
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::listing::PartitionedFile;
 use crate::datasource::physical_plan::FileSinkConfig;
 use crate::error::Result;
 use crate::physical_plan::SendableRecordBatchStream;
@@ -34,17 +33,13 @@ use datafusion_common::DataFusionError;
 use bytes::Bytes;
 use datafusion_execution::TaskContext;
 
-use futures::StreamExt;
-
-use object_store::{ObjectMeta, ObjectStore};
-
 use tokio::io::{AsyncWrite, AsyncWriteExt};
 use tokio::sync::mpsc::{self, Receiver};
 use tokio::task::{JoinHandle, JoinSet};
 use tokio::try_join;
 
 use super::demux::start_demuxer_task;
-use super::{create_writer, AbortableWrite, BatchSerializer, FileWriterMode};
+use super::{create_writer, AbortableWrite, BatchSerializer};
 
 type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
 type SerializerType = Box<dyn BatchSerializer>;
@@ -274,21 +269,9 @@ pub(crate) async fn stateless_multipart_put(
         stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, 
unbounded_input)
             .await
     });
-    while let Some((output_location, rb_stream)) = file_stream_rx.recv().await 
{
+    while let Some((location, rb_stream)) = file_stream_rx.recv().await {
         let serializer = get_serializer();
-        let object_meta = ObjectMeta {
-            location: output_location,
-            last_modified: chrono::offset::Utc::now(),
-            size: 0,
-            e_tag: None,
-        };
-        let writer = create_writer(
-            FileWriterMode::PutMultipart,
-            compression,
-            object_meta.into(),
-            object_store.clone(),
-        )
-        .await?;
+        let writer = create_writer(compression, &location, 
object_store.clone()).await?;
 
         tx_file_bundle
             .send((rb_stream, serializer, writer))
@@ -325,91 +308,3 @@ pub(crate) async fn stateless_multipart_put(
 
     Ok(total_count)
 }
-
-/// Orchestrates append_all for any statelessly serialized file type. Appends 
to all files provided
-/// in a round robin fashion.
-pub(crate) async fn stateless_append_all(
-    mut data: SendableRecordBatchStream,
-    context: &Arc<TaskContext>,
-    object_store: Arc<dyn ObjectStore>,
-    file_groups: &Vec<PartitionedFile>,
-    unbounded_input: bool,
-    compression: FileCompressionType,
-    get_serializer: Box<dyn Fn(usize) -> Box<dyn BatchSerializer> + Send>,
-) -> Result<u64> {
-    let rb_buffer_size = &context
-        .session_config()
-        .options()
-        .execution
-        .max_buffered_batches_per_output_file;
-
-    let (tx_file_bundle, rx_file_bundle) = 
tokio::sync::mpsc::channel(file_groups.len());
-    let mut send_channels = vec![];
-    for file_group in file_groups {
-        let serializer = get_serializer(file_group.object_meta.size);
-
-        let file = file_group.clone();
-        let writer = create_writer(
-            FileWriterMode::Append,
-            compression,
-            file.object_meta.clone().into(),
-            object_store.clone(),
-        )
-        .await?;
-
-        let (tx, rx) = tokio::sync::mpsc::channel(rb_buffer_size / 2);
-        send_channels.push(tx);
-        tx_file_bundle
-            .send((rx, serializer, writer))
-            .await
-            .map_err(|_| {
-                DataFusionError::Internal(
-                    "Writer receive file bundle channel closed 
unexpectedly!".into(),
-                )
-            })?;
-    }
-
-    let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel();
-    let write_coordinater_task = tokio::spawn(async move {
-        stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt, 
unbounded_input)
-            .await
-    });
-
-    // Append to file groups in round robin
-    let mut next_file_idx = 0;
-    while let Some(rb) = data.next().await.transpose()? {
-        send_channels[next_file_idx].send(rb).await.map_err(|_| {
-            DataFusionError::Internal(
-                "Recordbatch file append stream closed unexpectedly!".into(),
-            )
-        })?;
-        next_file_idx = (next_file_idx + 1) % send_channels.len();
-        if unbounded_input {
-            tokio::task::yield_now().await;
-        }
-    }
-    // Signal to the write coordinater that no more files are coming
-    drop(tx_file_bundle);
-    drop(send_channels);
-
-    let total_count = rx_row_cnt.await.map_err(|_| {
-        DataFusionError::Internal(
-            "Did not receieve row count from write coordinater".into(),
-        )
-    })?;
-
-    match try_join!(write_coordinater_task) {
-        Ok(r1) => {
-            r1.0?;
-        }
-        Err(e) => {
-            if e.is_panic() {
-                std::panic::resume_unwind(e.into_panic());
-            } else {
-                unreachable!();
-            }
-        }
-    }
-
-    Ok(total_count)
-}
diff --git a/datafusion/core/src/datasource/listing/mod.rs 
b/datafusion/core/src/datasource/listing/mod.rs
index 8b0f021f02..aa2e20164b 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -31,9 +31,7 @@ use std::pin::Pin;
 use std::sync::Arc;
 
 pub use self::url::ListingTableUrl;
-pub use table::{
-    ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode,
-};
+pub use table::{ListingOptions, ListingTable, ListingTableConfig};
 
 /// Stream of files get listed from object store
 pub type PartitionedFileStream =
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index c22eb58e88..515bc8a9e6 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -214,33 +214,6 @@ impl ListingTableConfig {
     }
 }
 
-#[derive(Debug, Clone)]
-///controls how new data should be inserted to a ListingTable
-pub enum ListingTableInsertMode {
-    ///Data should be appended to an existing file
-    AppendToFile,
-    ///Data is appended as new files in existing TablePaths
-    AppendNewFiles,
-    ///Throw an error if insert into is attempted on this table
-    Error,
-}
-
-impl FromStr for ListingTableInsertMode {
-    type Err = DataFusionError;
-    fn from_str(s: &str) -> Result<Self, Self::Err> {
-        let s_lower = s.to_lowercase();
-        match s_lower.as_str() {
-            "append_to_file" => Ok(ListingTableInsertMode::AppendToFile),
-            "append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles),
-            "error" => Ok(ListingTableInsertMode::Error),
-            _ => plan_err!(
-                "Unknown or unsupported insert mode {s}. Supported options are 
\
-                append_to_file, append_new_files, and error."
-            ),
-        }
-    }
-}
-
 /// Options for creating a [`ListingTable`]
 #[derive(Clone, Debug)]
 pub struct ListingOptions {
@@ -279,8 +252,6 @@ pub struct ListingOptions {
     /// In order to support infinite inputs, DataFusion may adjust query
     /// plans (e.g. joins) to run the given query in full pipelining mode.
     pub infinite_source: bool,
-    /// This setting controls how inserts to this table should be handled
-    pub insert_mode: ListingTableInsertMode,
     /// This setting when true indicates that the table is backed by a single 
file.
     /// Any inserts to the table may only append to this existing file.
     pub single_file: bool,
@@ -305,7 +276,6 @@ impl ListingOptions {
             target_partitions: 1,
             file_sort_order: vec![],
             infinite_source: false,
-            insert_mode: ListingTableInsertMode::AppendToFile,
             single_file: false,
             file_type_write_options: None,
         }
@@ -476,12 +446,6 @@ impl ListingOptions {
         self
     }
 
-    /// Configure how insertions to this table should be handled.
-    pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) -> 
Self {
-        self.insert_mode = insert_mode;
-        self
-    }
-
     /// Configure if this table is backed by a sigle file
     pub fn with_single_file(mut self, single_file: bool) -> Self {
         self.single_file = single_file;
@@ -806,6 +770,13 @@ impl TableProvider for ListingTable {
         }
 
         let table_path = &self.table_paths()[0];
+        if !table_path.is_collection() {
+            return plan_err!(
+                "Inserting into a ListingTable backed by a single file is not 
supported, URL is possibly missing a trailing `/`. \
+                To append to an existing file use StreamTable, e.g. by using 
CREATE UNBOUNDED EXTERNAL TABLE"
+            );
+        }
+
         // Get the object store for the table path.
         let store = state.runtime_env().object_store(table_path)?;
 
@@ -820,31 +791,6 @@ impl TableProvider for ListingTable {
         .await?;
 
         let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
-        //if we are writing a single output_partition to a table backed by a 
single file
-        //we can append to that file. Otherwise, we can write new files into 
the directory
-        //adding new files to the listing table in order to insert to the 
table.
-        let input_partitions = input.output_partitioning().partition_count();
-        let writer_mode = match self.options.insert_mode {
-            ListingTableInsertMode::AppendToFile => {
-                if input_partitions > file_groups.len() {
-                    return plan_err!(
-                        "Cannot append {input_partitions} partitions to {} 
files!",
-                        file_groups.len()
-                    );
-                }
-
-                crate::datasource::file_format::write::FileWriterMode::Append
-            }
-            ListingTableInsertMode::AppendNewFiles => {
-                
crate::datasource::file_format::write::FileWriterMode::PutMultipart
-            }
-            ListingTableInsertMode::Error => {
-                return plan_err!(
-                    "Invalid plan attempting write to table with 
TableWriteMode::Error!"
-                );
-            }
-        };
-
         let file_format = self.options().format.as_ref();
 
         let file_type_writer_options = match 
&self.options().file_type_write_options {
@@ -862,7 +808,6 @@ impl TableProvider for ListingTable {
             file_groups,
             output_schema: self.schema(),
             table_partition_cols: self.options.table_partition_cols.clone(),
-            writer_mode,
             // A plan can produce finite number of rows even if it has 
unbounded sources, like LIMIT
             // queries. Thus, we can check if the plan is streaming to ensure 
file sink input is
             // unbounded. When `unbounded_input` flag is `true` for sink, we 
occasionally call `yield_now`
@@ -877,14 +822,6 @@ impl TableProvider for ListingTable {
 
         let unsorted: Vec<Vec<Expr>> = vec![];
         let order_requirements = if self.options().file_sort_order != unsorted 
{
-            if matches!(
-                self.options().insert_mode,
-                ListingTableInsertMode::AppendToFile
-            ) {
-                return plan_err!(
-                    "Cannot insert into a sorted ListingTable with mode 
append!"
-                );
-            }
             // Multiple sort orders in outer vec are equivalent, so we pass 
only the first one
             let ordering = self
                 .try_create_output_ordering()?
@@ -1003,7 +940,7 @@ mod tests {
     use crate::prelude::*;
     use crate::{
         assert_batches_eq,
-        datasource::file_format::{avro::AvroFormat, 
file_compression_type::FileTypeExt},
+        datasource::file_format::avro::AvroFormat,
         execution::options::ReadOptions,
         logical_expr::{col, lit},
         test::{columns, object_store::register_test_store},
@@ -1567,17 +1504,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_insert_into_append_to_json_file() -> Result<()> {
-        helper_test_insert_into_append_to_existing_files(
-            FileType::JSON,
-            FileCompressionType::UNCOMPRESSED,
-            None,
-        )
-        .await?;
-        Ok(())
-    }
-
     #[tokio::test]
     async fn test_insert_into_append_new_json_files() -> Result<()> {
         let mut config_map: HashMap<String, String> = HashMap::new();
@@ -1596,17 +1522,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_insert_into_append_to_csv_file() -> Result<()> {
-        helper_test_insert_into_append_to_existing_files(
-            FileType::CSV,
-            FileCompressionType::UNCOMPRESSED,
-            None,
-        )
-        .await?;
-        Ok(())
-    }
-
     #[tokio::test]
     async fn test_insert_into_append_new_csv_files() -> Result<()> {
         let mut config_map: HashMap<String, String> = HashMap::new();
@@ -1663,13 +1578,8 @@ mod tests {
 
     #[tokio::test]
     async fn test_insert_into_sql_csv_defaults() -> Result<()> {
-        helper_test_insert_into_sql(
-            "csv",
-            FileCompressionType::UNCOMPRESSED,
-            "OPTIONS (insert_mode 'append_new_files')",
-            None,
-        )
-        .await?;
+        helper_test_insert_into_sql("csv", FileCompressionType::UNCOMPRESSED, 
"", None)
+            .await?;
         Ok(())
     }
 
@@ -1678,8 +1588,7 @@ mod tests {
         helper_test_insert_into_sql(
             "csv",
             FileCompressionType::UNCOMPRESSED,
-            "WITH HEADER ROW \
-            OPTIONS (insert_mode 'append_new_files')",
+            "WITH HEADER ROW",
             None,
         )
         .await?;
@@ -1688,13 +1597,8 @@ mod tests {
 
     #[tokio::test]
     async fn test_insert_into_sql_json_defaults() -> Result<()> {
-        helper_test_insert_into_sql(
-            "json",
-            FileCompressionType::UNCOMPRESSED,
-            "OPTIONS (insert_mode 'append_new_files')",
-            None,
-        )
-        .await?;
+        helper_test_insert_into_sql("json", FileCompressionType::UNCOMPRESSED, 
"", None)
+            .await?;
         Ok(())
     }
 
@@ -1879,211 +1783,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_insert_into_append_to_parquet_file_fails() -> Result<()> {
-        let maybe_err = helper_test_insert_into_append_to_existing_files(
-            FileType::PARQUET,
-            FileCompressionType::UNCOMPRESSED,
-            None,
-        )
-        .await;
-        let _err =
-            maybe_err.expect_err("Appending to existing parquet file did not 
fail!");
-        Ok(())
-    }
-
-    fn load_empty_schema_table(
-        schema: SchemaRef,
-        temp_path: &str,
-        insert_mode: ListingTableInsertMode,
-        file_format: Arc<dyn FileFormat>,
-    ) -> Result<Arc<dyn TableProvider>> {
-        File::create(temp_path)?;
-        let table_path = ListingTableUrl::parse(temp_path).unwrap();
-
-        let listing_options =
-            
ListingOptions::new(file_format.clone()).with_insert_mode(insert_mode);
-
-        let config = ListingTableConfig::new(table_path)
-            .with_listing_options(listing_options)
-            .with_schema(schema);
-
-        let table = ListingTable::try_new(config)?;
-        Ok(Arc::new(table))
-    }
-
-    /// Logic of testing inserting into listing table by Appending to existing 
files
-    /// is the same for all formats/options which support this. This helper 
allows
-    /// passing different options to execute the same test with different 
settings.
-    async fn helper_test_insert_into_append_to_existing_files(
-        file_type: FileType,
-        file_compression_type: FileCompressionType,
-        session_config_map: Option<HashMap<String, String>>,
-    ) -> Result<()> {
-        // Create the initial context, schema, and batch.
-        let session_ctx = match session_config_map {
-            Some(cfg) => {
-                let config = SessionConfig::from_string_hash_map(cfg)?;
-                SessionContext::new_with_config(config)
-            }
-            None => SessionContext::new(),
-        };
-        // Create a new schema with one field called "a" of type Int32
-        let schema = Arc::new(Schema::new(vec![Field::new(
-            "column1",
-            DataType::Int32,
-            false,
-        )]));
-
-        // Create a new batch of data to insert into the table
-        let batch = RecordBatch::try_new(
-            schema.clone(),
-            vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
-        )?;
-
-        // Filename with extension
-        let filename = format!(
-            "path{}",
-            file_type
-                .to_owned()
-                .get_ext_with_compression(file_compression_type)
-                .unwrap()
-        );
-
-        // Create a temporary directory and a CSV file within it.
-        let tmp_dir = TempDir::new()?;
-        let path = tmp_dir.path().join(filename);
-
-        let file_format: Arc<dyn FileFormat> = match file_type {
-            FileType::CSV => Arc::new(
-                
CsvFormat::default().with_file_compression_type(file_compression_type),
-            ),
-            FileType::JSON => Arc::new(
-                
JsonFormat::default().with_file_compression_type(file_compression_type),
-            ),
-            FileType::PARQUET => Arc::new(ParquetFormat::default()),
-            FileType::AVRO => Arc::new(AvroFormat {}),
-            FileType::ARROW => Arc::new(ArrowFormat {}),
-        };
-
-        let initial_table = load_empty_schema_table(
-            schema.clone(),
-            path.to_str().unwrap(),
-            ListingTableInsertMode::AppendToFile,
-            file_format,
-        )?;
-        session_ctx.register_table("t", initial_table)?;
-        // Create and register the source table with the provided schema and 
inserted data
-        let source_table = Arc::new(MemTable::try_new(
-            schema.clone(),
-            vec![vec![batch.clone(), batch.clone()]],
-        )?);
-        session_ctx.register_table("source", source_table.clone())?;
-        // Convert the source table into a provider so that it can be used in 
a query
-        let source = provider_as_source(source_table);
-        // Create a table scan logical plan to read from the source table
-        let scan_plan = LogicalPlanBuilder::scan("source", source, 
None)?.build()?;
-        // Create an insert plan to insert the source data into the initial 
table
-        let insert_into_table =
-            LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, 
false)?.build()?;
-        // Create a physical plan from the insert plan
-        let plan = session_ctx
-            .state()
-            .create_physical_plan(&insert_into_table)
-            .await?;
-
-        // Execute the physical plan and collect the results
-        let res = collect(plan, session_ctx.task_ctx()).await?;
-        // Insert returns the number of rows written, in our case this would 
be 6.
-        let expected = [
-            "+-------+",
-            "| count |",
-            "+-------+",
-            "| 6     |",
-            "+-------+",
-        ];
-
-        // Assert that the batches read from the file match the expected 
result.
-        assert_batches_eq!(expected, &res);
-
-        // Read the records in the table
-        let batches = session_ctx.sql("select * from 
t").await?.collect().await?;
-
-        // Define the expected result as a vector of strings.
-        let expected = [
-            "+---------+",
-            "| column1 |",
-            "+---------+",
-            "| 1       |",
-            "| 2       |",
-            "| 3       |",
-            "| 1       |",
-            "| 2       |",
-            "| 3       |",
-            "+---------+",
-        ];
-
-        // Assert that the batches read from the file match the expected 
result.
-        assert_batches_eq!(expected, &batches);
-
-        // Assert that only 1 file was added to the table
-        let num_files = tmp_dir.path().read_dir()?.count();
-        assert_eq!(num_files, 1);
-
-        // Create a physical plan from the insert plan
-        let plan = session_ctx
-            .state()
-            .create_physical_plan(&insert_into_table)
-            .await?;
-
-        // Again, execute the physical plan and collect the results
-        let res = collect(plan, session_ctx.task_ctx()).await?;
-        // Insert returns the number of rows written, in our case this would 
be 6.
-        let expected = [
-            "+-------+",
-            "| count |",
-            "+-------+",
-            "| 6     |",
-            "+-------+",
-        ];
-
-        // Assert that the batches read from the file match the expected 
result.
-        assert_batches_eq!(expected, &res);
-
-        // Open the CSV file, read its contents as a record batch, and collect 
the batches into a vector.
-        let batches = session_ctx.sql("select * from 
t").await?.collect().await?;
-
-        // Define the expected result after the second append.
-        let expected = vec![
-            "+---------+",
-            "| column1 |",
-            "+---------+",
-            "| 1       |",
-            "| 2       |",
-            "| 3       |",
-            "| 1       |",
-            "| 2       |",
-            "| 3       |",
-            "| 1       |",
-            "| 2       |",
-            "| 3       |",
-            "| 1       |",
-            "| 2       |",
-            "| 3       |",
-            "+---------+",
-        ];
-
-        // Assert that the batches read from the file after the second append 
match the expected result.
-        assert_batches_eq!(expected, &batches);
-
-        // Assert that no additional files were added to the table
-        let num_files = tmp_dir.path().read_dir()?.count();
-        assert_eq!(num_files, 1);
-
-        // Return Ok if the function
-        Ok(())
-    }
-
     async fn helper_test_append_new_files_to_table(
         file_type: FileType,
         file_compression_type: FileCompressionType,
@@ -2129,7 +1828,6 @@ mod tests {
                         "t",
                         tmp_dir.path().to_str().unwrap(),
                         CsvReadOptions::new()
-                            
.insert_mode(ListingTableInsertMode::AppendNewFiles)
                             .schema(schema.as_ref())
                             .file_compression_type(file_compression_type),
                     )
@@ -2141,7 +1839,6 @@ mod tests {
                         "t",
                         tmp_dir.path().to_str().unwrap(),
                         NdJsonReadOptions::default()
-                            
.insert_mode(ListingTableInsertMode::AppendNewFiles)
                             .schema(schema.as_ref())
                             .file_compression_type(file_compression_type),
                     )
@@ -2152,9 +1849,7 @@ mod tests {
                     .register_parquet(
                         "t",
                         tmp_dir.path().to_str().unwrap(),
-                        ParquetReadOptions::default()
-                            
.insert_mode(ListingTableInsertMode::AppendNewFiles)
-                            .schema(schema.as_ref()),
+                        ParquetReadOptions::default().schema(schema.as_ref()),
                     )
                     .await?;
             }
@@ -2163,10 +1858,7 @@ mod tests {
                     .register_avro(
                         "t",
                         tmp_dir.path().to_str().unwrap(),
-                        AvroReadOptions::default()
-                            // TODO implement insert_mode for avro
-                            
//.insert_mode(ListingTableInsertMode::AppendNewFiles)
-                            .schema(schema.as_ref()),
+                        AvroReadOptions::default().schema(schema.as_ref()),
                     )
                     .await?;
             }
@@ -2175,10 +1867,7 @@ mod tests {
                     .register_arrow(
                         "t",
                         tmp_dir.path().to_str().unwrap(),
-                        ArrowReadOptions::default()
-                            // TODO implement insert_mode for arrow
-                            
//.insert_mode(ListingTableInsertMode::AppendNewFiles)
-                            .schema(schema.as_ref()),
+                        ArrowReadOptions::default().schema(schema.as_ref()),
                     )
                     .await?;
             }
diff --git a/datafusion/core/src/datasource/listing/url.rs 
b/datafusion/core/src/datasource/listing/url.rs
index 9197e37adb..ba3c3fae21 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -181,6 +181,11 @@ impl ListingTableUrl {
         }
     }
 
+    /// Returns `true` if `path` refers to a collection of objects
+    pub fn is_collection(&self) -> bool {
+        self.url.as_str().ends_with('/')
+    }
+
     /// Strips the prefix of this [`ListingTableUrl`] from the provided path, 
returning
     /// an iterator of the remaining path segments
     pub(crate) fn strip_prefix<'a, 'b: 'a>(
@@ -203,8 +208,7 @@ impl ListingTableUrl {
         file_extension: &'a str,
     ) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
         // If the prefix is a file, use a head request, otherwise list
-        let is_dir = self.url.as_str().ends_with('/');
-        let list = match is_dir {
+        let list = match self.is_collection() {
             true => match 
ctx.runtime_env().cache_manager.get_list_files_cache() {
                 None => futures::stream::once(store.list(Some(&self.prefix)))
                     .try_flatten()
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs 
b/datafusion/core/src/datasource/listing_table_factory.rs
index f9a7ab04ce..543a3a83f7 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -21,8 +21,6 @@ use std::path::Path;
 use std::str::FromStr;
 use std::sync::Arc;
 
-use super::listing::ListingTableInsertMode;
-
 #[cfg(feature = "parquet")]
 use crate::datasource::file_format::parquet::ParquetFormat;
 use crate::datasource::file_format::{
@@ -38,7 +36,7 @@ use crate::execution::context::SessionState;
 
 use arrow::datatypes::{DataType, SchemaRef};
 use datafusion_common::file_options::{FileTypeWriterOptions, StatementOptions};
-use datafusion_common::{DataFusionError, FileType};
+use datafusion_common::{plan_err, DataFusionError, FileType};
 use datafusion_expr::CreateExternalTable;
 
 use async_trait::async_trait;
@@ -149,19 +147,12 @@ impl TableProviderFactory for ListingTableFactory {
             .take_bool_option("single_file")?
             .unwrap_or(false);
 
-        let explicit_insert_mode = 
statement_options.take_str_option("insert_mode");
-        let insert_mode = match explicit_insert_mode {
-            Some(mode) => ListingTableInsertMode::from_str(mode.as_str()),
-            None => match file_type {
-                FileType::CSV => Ok(ListingTableInsertMode::AppendToFile),
-                #[cfg(feature = "parquet")]
-                FileType::PARQUET => 
Ok(ListingTableInsertMode::AppendNewFiles),
-                FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles),
-                FileType::JSON => Ok(ListingTableInsertMode::AppendToFile),
-                FileType::ARROW => Ok(ListingTableInsertMode::AppendNewFiles),
-            },
-        }?;
-
+        // Backwards compatibility
+        if let Some(s) = statement_options.take_str_option("insert_mode") {
+            if !s.eq_ignore_ascii_case("append_new_files") {
+                return plan_err!("Unknown or unsupported insert mode {s}. Only 
append_to_file supported");
+            }
+        }
         let file_type = file_format.file_type();
 
         // Use remaining options and session state to build 
FileTypeWriterOptions
@@ -214,7 +205,6 @@ impl TableProviderFactory for ListingTableFactory {
             .with_target_partitions(state.config().target_partitions())
             .with_table_partition_cols(table_partition_cols)
             .with_file_sort_order(cmd.order_exprs.clone())
-            .with_insert_mode(insert_mode)
             .with_single_file(single_file)
             .with_write_options(file_type_writer_options)
             .with_infinite_source(unbounded);
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index ea0a9698ff..738e70966b 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -49,10 +49,7 @@ use std::{
 
 use super::listing::ListingTableUrl;
 use crate::error::{DataFusionError, Result};
-use crate::{
-    datasource::file_format::write::FileWriterMode,
-    physical_plan::{DisplayAs, DisplayFormatType},
-};
+use crate::physical_plan::{DisplayAs, DisplayFormatType};
 use crate::{
     datasource::{
         listing::{FileRange, PartitionedFile},
@@ -90,8 +87,6 @@ pub struct FileSinkConfig {
     /// A vector of column names and their corresponding data types,
     /// representing the partitioning columns for the file
     pub table_partition_cols: Vec<(String, DataType)>,
-    /// A writer mode that determines how data is written to the file
-    pub writer_mode: FileWriterMode,
     /// If true, it is assumed there is a single table_path which is a file to 
which all data should be written
     /// regardless of input partitioning. Otherwise, each table path is 
assumed to be a directory
     /// to which each output partition is written to its own output file.
diff --git a/datafusion/core/src/datasource/stream.rs 
b/datafusion/core/src/datasource/stream.rs
index fc19ff954d..6965968b6f 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/core/src/datasource/stream.rs
@@ -171,7 +171,7 @@ impl StreamConfig {
         match &self.encoding {
             StreamEncoding::Csv => {
                 let header = self.header && !self.location.exists();
-                let file = 
OpenOptions::new().write(true).open(&self.location)?;
+                let file = 
OpenOptions::new().append(true).open(&self.location)?;
                 let writer = arrow::csv::WriterBuilder::new()
                     .with_header(header)
                     .build(file);
@@ -179,7 +179,7 @@ impl StreamConfig {
                 Ok(Box::new(writer))
             }
             StreamEncoding::Json => {
-                let file = 
OpenOptions::new().write(true).open(&self.location)?;
+                let file = 
OpenOptions::new().append(true).open(&self.location)?;
                 Ok(Box::new(arrow::json::LineDelimitedWriter::new(file)))
             }
         }
@@ -298,7 +298,12 @@ struct StreamWrite(Arc<StreamConfig>);
 
 impl DisplayAs for StreamWrite {
     fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> 
std::fmt::Result {
-        write!(f, "{self:?}")
+        f.debug_struct("StreamWrite")
+            .field("location", &self.0.location)
+            .field("batch_size", &self.0.batch_size)
+            .field("encoding", &self.0.encoding)
+            .field("header", &self.0.header)
+            .finish_non_exhaustive()
     }
 }
 
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 1f1ef73cae..82d96c98e6 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -27,7 +27,6 @@ use crate::datasource::file_format::csv::CsvFormat;
 use crate::datasource::file_format::json::JsonFormat;
 #[cfg(feature = "parquet")]
 use crate::datasource::file_format::parquet::ParquetFormat;
-use crate::datasource::file_format::write::FileWriterMode;
 use crate::datasource::file_format::FileFormat;
 use crate::datasource::listing::ListingTableUrl;
 use crate::datasource::physical_plan::FileSinkConfig;
@@ -591,7 +590,6 @@ impl DefaultPhysicalPlanner {
                         output_schema: Arc::new(schema),
                         table_partition_cols: vec![],
                         unbounded_input: false,
-                        writer_mode: FileWriterMode::PutMultipart,
                         single_file_output: *single_file_output,
                         overwrite: false,
                         file_type_writer_options
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index ad83ea1fce..750d12bd77 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1157,12 +1157,6 @@ message PhysicalPlanNode {
   }
 }
 
-enum FileWriterMode {
-  APPEND = 0;
-  PUT = 1;
-  PUT_MULTIPART = 2;
-}
-
 enum CompressionTypeVariant {
   GZIP = 0;
   BZIP2 = 1;
@@ -1187,12 +1181,13 @@ message JsonWriterOptions {
 }
 
 message FileSinkConfig {
+  reserved 6; // writer_mode
+
   string object_store_url = 1;
   repeated PartitionedFile file_groups = 2;
   repeated string table_paths = 3;
   Schema output_schema = 4;
   repeated PartitionColumn table_partition_cols = 5;
-  FileWriterMode writer_mode = 6;
   bool single_file_output = 7;
   bool unbounded_input = 8;
   bool overwrite = 9;
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 016719a600..af64bd68de 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -7471,9 +7471,6 @@ impl serde::Serialize for FileSinkConfig {
         if !self.table_partition_cols.is_empty() {
             len += 1;
         }
-        if self.writer_mode != 0 {
-            len += 1;
-        }
         if self.single_file_output {
             len += 1;
         }
@@ -7502,11 +7499,6 @@ impl serde::Serialize for FileSinkConfig {
         if !self.table_partition_cols.is_empty() {
             struct_ser.serialize_field("tablePartitionCols", 
&self.table_partition_cols)?;
         }
-        if self.writer_mode != 0 {
-            let v = FileWriterMode::try_from(self.writer_mode)
-                .map_err(|_| serde::ser::Error::custom(format!("Invalid 
variant {}", self.writer_mode)))?;
-            struct_ser.serialize_field("writerMode", &v)?;
-        }
         if self.single_file_output {
             struct_ser.serialize_field("singleFileOutput", 
&self.single_file_output)?;
         }
@@ -7539,8 +7531,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
             "outputSchema",
             "table_partition_cols",
             "tablePartitionCols",
-            "writer_mode",
-            "writerMode",
             "single_file_output",
             "singleFileOutput",
             "unbounded_input",
@@ -7557,7 +7547,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
             TablePaths,
             OutputSchema,
             TablePartitionCols,
-            WriterMode,
             SingleFileOutput,
             UnboundedInput,
             Overwrite,
@@ -7588,7 +7577,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                             "tablePaths" | "table_paths" => 
Ok(GeneratedField::TablePaths),
                             "outputSchema" | "output_schema" => 
Ok(GeneratedField::OutputSchema),
                             "tablePartitionCols" | "table_partition_cols" => 
Ok(GeneratedField::TablePartitionCols),
-                            "writerMode" | "writer_mode" => 
Ok(GeneratedField::WriterMode),
                             "singleFileOutput" | "single_file_output" => 
Ok(GeneratedField::SingleFileOutput),
                             "unboundedInput" | "unbounded_input" => 
Ok(GeneratedField::UnboundedInput),
                             "overwrite" => Ok(GeneratedField::Overwrite),
@@ -7617,7 +7605,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                 let mut table_paths__ = None;
                 let mut output_schema__ = None;
                 let mut table_partition_cols__ = None;
-                let mut writer_mode__ = None;
                 let mut single_file_output__ = None;
                 let mut unbounded_input__ = None;
                 let mut overwrite__ = None;
@@ -7654,12 +7641,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                             }
                             table_partition_cols__ = Some(map_.next_value()?);
                         }
-                        GeneratedField::WriterMode => {
-                            if writer_mode__.is_some() {
-                                return 
Err(serde::de::Error::duplicate_field("writerMode"));
-                            }
-                            writer_mode__ = 
Some(map_.next_value::<FileWriterMode>()? as i32);
-                        }
                         GeneratedField::SingleFileOutput => {
                             if single_file_output__.is_some() {
                                 return 
Err(serde::de::Error::duplicate_field("singleFileOutput"));
@@ -7692,7 +7673,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
                     table_paths: table_paths__.unwrap_or_default(),
                     output_schema: output_schema__,
                     table_partition_cols: 
table_partition_cols__.unwrap_or_default(),
-                    writer_mode: writer_mode__.unwrap_or_default(),
                     single_file_output: 
single_file_output__.unwrap_or_default(),
                     unbounded_input: unbounded_input__.unwrap_or_default(),
                     overwrite: overwrite__.unwrap_or_default(),
@@ -7800,80 +7780,6 @@ impl<'de> serde::Deserialize<'de> for 
FileTypeWriterOptions {
         deserializer.deserialize_struct("datafusion.FileTypeWriterOptions", 
FIELDS, GeneratedVisitor)
     }
 }
-impl serde::Serialize for FileWriterMode {
-    #[allow(deprecated)]
-    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
-    where
-        S: serde::Serializer,
-    {
-        let variant = match self {
-            Self::Append => "APPEND",
-            Self::Put => "PUT",
-            Self::PutMultipart => "PUT_MULTIPART",
-        };
-        serializer.serialize_str(variant)
-    }
-}
-impl<'de> serde::Deserialize<'de> for FileWriterMode {
-    #[allow(deprecated)]
-    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
-    where
-        D: serde::Deserializer<'de>,
-    {
-        const FIELDS: &[&str] = &[
-            "APPEND",
-            "PUT",
-            "PUT_MULTIPART",
-        ];
-
-        struct GeneratedVisitor;
-
-        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
-            type Value = FileWriterMode;
-
-            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> 
std::fmt::Result {
-                write!(formatter, "expected one of: {:?}", &FIELDS)
-            }
-
-            fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, 
E>
-            where
-                E: serde::de::Error,
-            {
-                i32::try_from(v)
-                    .ok()
-                    .and_then(|x| x.try_into().ok())
-                    .ok_or_else(|| {
-                        
serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self)
-                    })
-            }
-
-            fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, 
E>
-            where
-                E: serde::de::Error,
-            {
-                i32::try_from(v)
-                    .ok()
-                    .and_then(|x| x.try_into().ok())
-                    .ok_or_else(|| {
-                        
serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self)
-                    })
-            }
-
-            fn visit_str<E>(self, value: &str) -> 
std::result::Result<Self::Value, E>
-            where
-                E: serde::de::Error,
-            {
-                match value {
-                    "APPEND" => Ok(FileWriterMode::Append),
-                    "PUT" => Ok(FileWriterMode::Put),
-                    "PUT_MULTIPART" => Ok(FileWriterMode::PutMultipart),
-                    _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
-                }
-            }
-        }
-        deserializer.deserialize_any(GeneratedVisitor)
-    }
-}
 impl serde::Serialize for FilterExecNode {
     #[allow(deprecated)]
     fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, 
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index 647f814fda..b23f09e91b 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1615,8 +1615,6 @@ pub struct FileSinkConfig {
     pub output_schema: ::core::option::Option<Schema>,
     #[prost(message, repeated, tag = "5")]
     pub table_partition_cols: ::prost::alloc::vec::Vec<PartitionColumn>,
-    #[prost(enumeration = "FileWriterMode", tag = "6")]
-    pub writer_mode: i32,
     #[prost(bool, tag = "7")]
     pub single_file_output: bool,
     #[prost(bool, tag = "8")]
@@ -3200,35 +3198,6 @@ impl UnionMode {
 }
 #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, 
::prost::Enumeration)]
 #[repr(i32)]
-pub enum FileWriterMode {
-    Append = 0,
-    Put = 1,
-    PutMultipart = 2,
-}
-impl FileWriterMode {
-    /// String value of the enum field names used in the ProtoBuf definition.
-    ///
-    /// The values are not transformed in any way and thus are considered 
stable
-    /// (if the ProtoBuf definition does not change) and safe for programmatic 
use.
-    pub fn as_str_name(&self) -> &'static str {
-        match self {
-            FileWriterMode::Append => "APPEND",
-            FileWriterMode::Put => "PUT",
-            FileWriterMode::PutMultipart => "PUT_MULTIPART",
-        }
-    }
-    /// Creates an enum from field names used in the ProtoBuf definition.
-    pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
-        match value {
-            "APPEND" => Some(Self::Append),
-            "PUT" => Some(Self::Put),
-            "PUT_MULTIPART" => Some(Self::PutMultipart),
-            _ => None,
-        }
-    }
-}
-#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, 
::prost::Enumeration)]
-#[repr(i32)]
 pub enum CompressionTypeVariant {
     Gzip = 0,
     Bzip2 = 1,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index 22b74db9af..f5771ddb15 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -23,7 +23,6 @@ use std::sync::Arc;
 use arrow::compute::SortOptions;
 use datafusion::arrow::datatypes::Schema;
 use datafusion::datasource::file_format::json::JsonSink;
-use datafusion::datasource::file_format::write::FileWriterMode;
 use datafusion::datasource::listing::{FileRange, ListingTableUrl, 
PartitionedFile};
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
@@ -739,7 +738,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
             table_paths,
             output_schema: Arc::new(convert_required!(conf.output_schema)?),
             table_partition_cols,
-            writer_mode: conf.writer_mode().into(),
             single_file_output: conf.single_file_output,
             unbounded_input: conf.unbounded_input,
             overwrite: conf.overwrite,
@@ -748,16 +746,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig 
{
     }
 }
 
-impl From<protobuf::FileWriterMode> for FileWriterMode {
-    fn from(value: protobuf::FileWriterMode) -> Self {
-        match value {
-            protobuf::FileWriterMode::Append => Self::Append,
-            protobuf::FileWriterMode::Put => Self::Put,
-            protobuf::FileWriterMode::PutMultipart => Self::PutMultipart,
-        }
-    }
-}
-
 impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
     fn from(value: protobuf::CompressionTypeVariant) -> Self {
         match value {
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index b8a590b0dc..44864be947 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -31,7 +31,6 @@ use datafusion::datasource::{
     file_format::json::JsonSink, physical_plan::FileScanConfig,
 };
 use datafusion::datasource::{
-    file_format::write::FileWriterMode,
     listing::{FileRange, PartitionedFile},
     physical_plan::FileSinkConfig,
 };
@@ -819,7 +818,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
     type Error = DataFusionError;
 
     fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
-        let writer_mode: protobuf::FileWriterMode = conf.writer_mode.into();
         let file_groups = conf
             .file_groups
             .iter()
@@ -847,7 +845,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
             table_paths,
             output_schema: Some(conf.output_schema.as_ref().try_into()?),
             table_partition_cols,
-            writer_mode: writer_mode.into(),
             single_file_output: conf.single_file_output,
             unbounded_input: conf.unbounded_input,
             overwrite: conf.overwrite,
@@ -856,16 +853,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig 
{
     }
 }
 
-impl From<FileWriterMode> for protobuf::FileWriterMode {
-    fn from(value: FileWriterMode) -> Self {
-        match value {
-            FileWriterMode::Append => Self::Append,
-            FileWriterMode::Put => Self::Put,
-            FileWriterMode::PutMultipart => Self::PutMultipart,
-        }
-    }
-}
-
 impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
     fn from(value: &CompressionTypeVariant) -> Self {
         match value {
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 076ca41581..23b0ea43c7 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -22,7 +22,6 @@ use datafusion::arrow::array::ArrayRef;
 use datafusion::arrow::compute::kernels::sort::SortOptions;
 use datafusion::arrow::datatypes::{DataType, Field, Fields, IntervalUnit, 
Schema};
 use datafusion::datasource::file_format::json::JsonSink;
-use datafusion::datasource::file_format::write::FileWriterMode;
 use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::{
@@ -732,7 +731,6 @@ fn roundtrip_json_sink() -> Result<()> {
         table_paths: vec![ListingTableUrl::parse("file:///")?],
         output_schema: schema.clone(),
         table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
-        writer_mode: FileWriterMode::Put,
         single_file_output: true,
         unbounded_input: false,
         overwrite: true,
diff --git a/datafusion/sqllogictest/test_files/copy.slt 
b/datafusion/sqllogictest/test_files/copy.slt
index 6e4a711a01..fbf1523477 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -32,7 +32,7 @@ logical_plan
 CopyTo: format=parquet output_url=test_files/scratch/copy/table 
single_file_output=false options: (compression 'zstd(10)')
 --TableScan: source_table projection=[col1, col2]
 physical_plan
-FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
+FileSinkExec: sink=ParquetSink(file_groups=[])
 --MemoryExec: partitions=1, partition_sizes=[1]
 
 # Error case
diff --git a/datafusion/sqllogictest/test_files/errors.slt 
b/datafusion/sqllogictest/test_files/errors.slt
index 4aded8a576..e3b2610e51 100644
--- a/datafusion/sqllogictest/test_files/errors.slt
+++ b/datafusion/sqllogictest/test_files/errors.slt
@@ -133,4 +133,4 @@ order by c9
 
 
 statement error Inconsistent data type across values list at row 1 column 0. 
Was Int64 but found Utf8
-create table foo as values (1), ('foo');
\ No newline at end of file
+create table foo as values (1), ('foo');
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index 9726c35a31..129814767c 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -140,7 +140,7 @@ physical_plan CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/te
 # create a sink table, path is same with aggregate_test_100 table
 # we do not overwrite this file, we only assert plan.
 statement ok
-CREATE EXTERNAL TABLE sink_table (
+CREATE UNBOUNDED EXTERNAL TABLE sink_table (
         c1  VARCHAR NOT NULL,
         c2  TINYINT NOT NULL,
         c3  SMALLINT NOT NULL,
@@ -168,7 +168,7 @@ Dml: op=[Insert Into] table=[sink_table]
 ----Sort: aggregate_test_100.c1 ASC NULLS LAST
 ------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, 
c8, c9, c10, c11, c12, c13]
 physical_plan
-FileSinkExec: sink=CsvSink(writer_mode=Append, 
file_groups=[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv])
+FileSinkExec: sink=StreamWrite { location: 
"../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding: 
Csv, header: true, .. }
 --SortExec: expr=[c1@0 ASC NULLS LAST]
 ----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true
 
diff --git a/datafusion/sqllogictest/test_files/insert.slt 
b/datafusion/sqllogictest/test_files/insert.slt
index 9860bdcae0..a100b5ac6b 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -289,7 +289,7 @@ insert into table_without_values values(2, NULL);
 ----
 1
 
-# insert NULL values for the missing column (field2) 
+# insert NULL values for the missing column (field2)
 query II
 insert into table_without_values(field1) values(3);
 ----
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt 
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index 4441036241..39323479ff 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -100,7 +100,7 @@ Dml: op=[Insert Into] table=[ordered_insert_test]
 --Projection: column1 AS a, column2 AS b
 ----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)), 
(Int64(7), Int64(8)), (Int64(7), Int64(9))...
 physical_plan
-FileSinkExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[])
+FileSinkExec: sink=CsvSink(file_groups=[])
 --SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC]
 ----ProjectionExec: expr=[column1@0 as a, column2@1 as b]
 ------ValuesExec
@@ -254,6 +254,22 @@ create_local_path 'true',
 single_file 'true',
 );
 
+query error DataFusion error: Error during planning: Inserting into a 
ListingTable backed by a single file is not supported, URL is possibly missing 
a trailing `/`\. To append to an existing file use StreamTable, e\.g\. by using 
CREATE UNBOUNDED EXTERNAL TABLE
+INSERT INTO single_file_test values (1, 2), (3, 4);
+
+statement ok
+drop table single_file_test;
+
+statement ok
+CREATE UNBOUNDED EXTERNAL TABLE
+single_file_test(a bigint, b bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/single_csv_table.csv'
+OPTIONS(
+create_local_path 'true',
+single_file 'true',
+);
+
 query II
 INSERT INTO single_file_test values (1, 2), (3, 4);
 ----
@@ -315,7 +331,7 @@ Dml: op=[Insert Into] table=[table_without_values]
 --------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
-FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
+FileSinkExec: sink=ParquetSink(file_groups=[])
 --ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
 ----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
 ------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION 
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PAR [...]
@@ -378,7 +394,7 @@ Dml: op=[Insert Into] table=[table_without_values]
 ----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64)) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
 ------TableScan: aggregate_test_100 projection=[c1, c4, c9]
 physical_plan
-FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
+FileSinkExec: sink=ParquetSink(file_groups=[])
 --CoalescePartitionsExec
 ----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
 ------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name: 
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound [...]
@@ -422,7 +438,7 @@ Dml: op=[Insert Into] table=[table_without_values]
 ----Sort: aggregate_test_100.c1 ASC NULLS LAST
 ------TableScan: aggregate_test_100 projection=[c1]
 physical_plan
-FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
+FileSinkExec: sink=ParquetSink(file_groups=[])
 --SortExec: expr=[c1@0 ASC NULLS LAST]
 ----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], 
has_header=true
 
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 737b43b5a9..0fea8da5a3 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3486,4 +3486,3 @@ set datafusion.optimizer.prefer_existing_sort = false;
 
 statement ok
 drop table annotated_data;
-
diff --git a/datafusion/sqllogictest/test_files/options.slt 
b/datafusion/sqllogictest/test_files/options.slt
index 83fe85745e..9366a9b3b3 100644
--- a/datafusion/sqllogictest/test_files/options.slt
+++ b/datafusion/sqllogictest/test_files/options.slt
@@ -84,7 +84,7 @@ statement ok
 drop table a
 
 # test datafusion.sql_parser.parse_float_as_decimal
-# 
+#
 # default option value is false
 query RR
 select 10000000000000000000.01, -10000000000000000000.01
@@ -209,5 +209,3 @@ select -123456789.0123456789012345678901234567890
 # Restore option to default value
 statement ok
 set datafusion.sql_parser.parse_float_as_decimal = false;
-
-
diff --git a/datafusion/sqllogictest/test_files/order.slt 
b/datafusion/sqllogictest/test_files/order.slt
index 8148f1c4c7..9c5d1704f4 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -447,7 +447,7 @@ statement ok
 drop table multiple_ordered_table;
 
 # Create tables having some ordered columns. In the next step, we will expect 
to observe that scalar
-# functions, such as mathematical functions like atan(), ceil(), sqrt(), or 
date_time functions 
+# functions, such as mathematical functions like atan(), ceil(), sqrt(), or 
date_time functions
 # like date_bin() and date_trunc(), will maintain the order of its argument 
columns.
 statement ok
 CREATE EXTERNAL TABLE csv_with_timestamps (
diff --git a/datafusion/sqllogictest/test_files/predicates.slt 
b/datafusion/sqllogictest/test_files/predicates.slt
index d22b2ff953..e992a440d0 100644
--- a/datafusion/sqllogictest/test_files/predicates.slt
+++ b/datafusion/sqllogictest/test_files/predicates.slt
@@ -495,6 +495,7 @@ set datafusion.execution.parquet.bloom_filter_enabled=true;
 
 query T
 SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'foo';
+----
 
 query T
 SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'test';
diff --git a/datafusion/sqllogictest/test_files/set_variable.slt 
b/datafusion/sqllogictest/test_files/set_variable.slt
index 714e1e995e..440fb2c6ef 100644
--- a/datafusion/sqllogictest/test_files/set_variable.slt
+++ b/datafusion/sqllogictest/test_files/set_variable.slt
@@ -243,4 +243,4 @@ statement ok
 SET TIME ZONE = 'Asia/Taipei2'
 
 statement error Arrow error: Parser error: Invalid timezone "Asia/Taipei2": 
'Asia/Taipei2' is not a valid timezone
-SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ
\ No newline at end of file
+SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ
diff --git a/datafusion/sqllogictest/test_files/update.slt 
b/datafusion/sqllogictest/test_files/update.slt
index c88082fc72..6412c3ca85 100644
--- a/datafusion/sqllogictest/test_files/update.slt
+++ b/datafusion/sqllogictest/test_files/update.slt
@@ -89,4 +89,4 @@ Dml: op=[Update] table=[t1]
 ------CrossJoin:
 --------SubqueryAlias: t
 ----------TableScan: t1
---------TableScan: t2
\ No newline at end of file
+--------TableScan: t2


Reply via email to