This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 325a3fbe76 Remove FileWriterMode and ListingTableInsertMode (#7994)
(#8017)
325a3fbe76 is described below
commit 325a3fbe7623d3df0ab64867545c4d93a0c96015
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Nov 17 22:14:27 2023 +0000
Remove FileWriterMode and ListingTableInsertMode (#7994) (#8017)
* Remove FileWriterMode Support (#7994)
* Don't ignore test
* Error on insert to single file
* Improve DisplayAs
---
datafusion/core/src/datasource/file_format/csv.rs | 74 +----
datafusion/core/src/datasource/file_format/json.rs | 59 +---
.../core/src/datasource/file_format/options.rs | 32 +-
.../core/src/datasource/file_format/parquet.rs | 67 ++--
.../core/src/datasource/file_format/write/mod.rs | 204 ++----------
.../datasource/file_format/write/orchestration.rs | 111 +------
datafusion/core/src/datasource/listing/mod.rs | 4 +-
datafusion/core/src/datasource/listing/table.rs | 343 +--------------------
datafusion/core/src/datasource/listing/url.rs | 8 +-
.../core/src/datasource/listing_table_factory.rs | 24 +-
.../core/src/datasource/physical_plan/mod.rs | 7 +-
datafusion/core/src/datasource/stream.rs | 11 +-
datafusion/core/src/physical_planner.rs | 2 -
datafusion/proto/proto/datafusion.proto | 9 +-
datafusion/proto/src/generated/pbjson.rs | 94 ------
datafusion/proto/src/generated/prost.rs | 31 --
datafusion/proto/src/physical_plan/from_proto.rs | 12 -
datafusion/proto/src/physical_plan/to_proto.rs | 13 -
.../proto/tests/cases/roundtrip_physical_plan.rs | 2 -
datafusion/sqllogictest/test_files/copy.slt | 2 +-
datafusion/sqllogictest/test_files/errors.slt | 2 +-
datafusion/sqllogictest/test_files/explain.slt | 4 +-
datafusion/sqllogictest/test_files/insert.slt | 2 +-
.../sqllogictest/test_files/insert_to_external.slt | 24 +-
datafusion/sqllogictest/test_files/joins.slt | 1 -
datafusion/sqllogictest/test_files/options.slt | 4 +-
datafusion/sqllogictest/test_files/order.slt | 2 +-
datafusion/sqllogictest/test_files/predicates.slt | 1 +
.../sqllogictest/test_files/set_variable.slt | 2 +-
datafusion/sqllogictest/test_files/update.slt | 2 +-
30 files changed, 127 insertions(+), 1026 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/csv.rs
b/datafusion/core/src/datasource/file_format/csv.rs
index 5f2084bc80..684f416f77 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -34,10 +34,10 @@ use futures::stream::BoxStream;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use object_store::{delimited::newline_delimited_stream, ObjectMeta,
ObjectStore};
-use super::write::orchestration::{stateless_append_all,
stateless_multipart_put};
+use super::write::orchestration::stateless_multipart_put;
use super::{FileFormat, DEFAULT_SCHEMA_INFER_MAX_RECORD};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
+use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::physical_plan::{
CsvExec, FileGroupDisplay, FileScanConfig, FileSinkConfig,
};
@@ -465,11 +465,7 @@ impl DisplayAs for CsvSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(
- f,
- "CsvSink(writer_mode={:?}, file_groups=",
- self.config.writer_mode
- )?;
+ write!(f, "CsvSink(file_groups=",)?;
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
write!(f, ")")
}
@@ -481,55 +477,6 @@ impl CsvSink {
fn new(config: FileSinkConfig) -> Self {
Self { config }
}
-
- async fn append_all(
- &self,
- data: SendableRecordBatchStream,
- context: &Arc<TaskContext>,
- ) -> Result<u64> {
- if !self.config.table_partition_cols.is_empty() {
- return Err(DataFusionError::NotImplemented("Inserting in append
mode to hive style partitioned tables is not supported".into()));
- }
- let writer_options =
self.config.file_type_writer_options.try_into_csv()?;
- let (builder, compression) =
- (&writer_options.writer_options, &writer_options.compression);
- let compression = FileCompressionType::from(*compression);
-
- let object_store = context
- .runtime_env()
- .object_store(&self.config.object_store_url)?;
- let file_groups = &self.config.file_groups;
-
- let builder_clone = builder.clone();
- let options_clone = writer_options.clone();
- let get_serializer = move |file_size| {
- let inner_clone = builder_clone.clone();
- // In append mode, consider has_header flag only when file is
empty (at the start).
- // For other modes, use has_header flag as is.
- let serializer: Box<dyn BatchSerializer> = Box::new(if file_size >
0 {
- CsvSerializer::new()
- .with_builder(inner_clone)
- .with_header(false)
- } else {
- CsvSerializer::new()
- .with_builder(inner_clone)
- .with_header(options_clone.writer_options.header())
- });
- serializer
- };
-
- stateless_append_all(
- data,
- context,
- object_store,
- file_groups,
- self.config.unbounded_input,
- compression,
- Box::new(get_serializer),
- )
- .await
- }
-
async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
@@ -577,19 +524,8 @@ impl DataSink for CsvSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
- match self.config.writer_mode {
- FileWriterMode::Append => {
- let total_count = self.append_all(data, context).await?;
- Ok(total_count)
- }
- FileWriterMode::PutMultipart => {
- let total_count = self.multipartput_all(data, context).await?;
- Ok(total_count)
- }
- FileWriterMode::Put => {
- return not_impl_err!("FileWriterMode::Put is not supported
yet!")
- }
- }
+ let total_count = self.multipartput_all(data, context).await?;
+ Ok(total_count)
}
}
diff --git a/datafusion/core/src/datasource/file_format/json.rs
b/datafusion/core/src/datasource/file_format/json.rs
index 8d62d0a858..9893a1db45 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -45,10 +45,10 @@ use crate::physical_plan::insert::FileSinkExec;
use crate::physical_plan::SendableRecordBatchStream;
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
-use super::write::orchestration::{stateless_append_all,
stateless_multipart_put};
+use super::write::orchestration::stateless_multipart_put;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::file_format::write::{BatchSerializer, FileWriterMode};
+use crate::datasource::file_format::write::BatchSerializer;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
use crate::datasource::physical_plan::{FileSinkConfig, NdJsonExec};
use crate::error::Result;
@@ -245,11 +245,7 @@ impl DisplayAs for JsonSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(
- f,
- "JsonSink(writer_mode={:?}, file_groups=",
- self.config.writer_mode
- )?;
+ write!(f, "JsonSink(file_groups=",)?;
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
write!(f, ")")
}
@@ -268,40 +264,6 @@ impl JsonSink {
&self.config
}
- async fn append_all(
- &self,
- data: SendableRecordBatchStream,
- context: &Arc<TaskContext>,
- ) -> Result<u64> {
- if !self.config.table_partition_cols.is_empty() {
- return Err(DataFusionError::NotImplemented("Inserting in append
mode to hive style partitioned tables is not supported".into()));
- }
-
- let writer_options =
self.config.file_type_writer_options.try_into_json()?;
- let compression = &writer_options.compression;
-
- let object_store = context
- .runtime_env()
- .object_store(&self.config.object_store_url)?;
- let file_groups = &self.config.file_groups;
-
- let get_serializer = move |_| {
- let serializer: Box<dyn BatchSerializer> =
Box::new(JsonSerializer::new());
- serializer
- };
-
- stateless_append_all(
- data,
- context,
- object_store,
- file_groups,
- self.config.unbounded_input,
- (*compression).into(),
- Box::new(get_serializer),
- )
- .await
- }
-
async fn multipartput_all(
&self,
data: SendableRecordBatchStream,
@@ -342,19 +304,8 @@ impl DataSink for JsonSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
- match self.config.writer_mode {
- FileWriterMode::Append => {
- let total_count = self.append_all(data, context).await?;
- Ok(total_count)
- }
- FileWriterMode::PutMultipart => {
- let total_count = self.multipartput_all(data, context).await?;
- Ok(total_count)
- }
- FileWriterMode::Put => {
- return not_impl_err!("FileWriterMode::Put is not supported
yet!")
- }
- }
+ let total_count = self.multipartput_all(data, context).await?;
+ Ok(total_count)
}
}
diff --git a/datafusion/core/src/datasource/file_format/options.rs
b/datafusion/core/src/datasource/file_format/options.rs
index 41a70e6d2f..4c7557a4a9 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -28,7 +28,7 @@ use
crate::datasource::file_format::file_compression_type::FileCompressionType;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
-use crate::datasource::listing::{ListingTableInsertMode, ListingTableUrl};
+use crate::datasource::listing::ListingTableUrl;
use crate::datasource::{
file_format::{avro::AvroFormat, csv::CsvFormat, json::JsonFormat},
listing::ListingOptions,
@@ -76,8 +76,6 @@ pub struct CsvReadOptions<'a> {
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
- /// Setting controls how inserts to this file should be handled
- pub insert_mode: ListingTableInsertMode,
}
impl<'a> Default for CsvReadOptions<'a> {
@@ -101,7 +99,6 @@ impl<'a> CsvReadOptions<'a> {
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
- insert_mode: ListingTableInsertMode::AppendToFile,
}
}
@@ -184,12 +181,6 @@ impl<'a> CsvReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}
-
- /// Configure how insertions to this table should be handled
- pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
- self.insert_mode = insert_mode;
- self
- }
}
/// Options that control the reading of Parquet files.
@@ -219,8 +210,6 @@ pub struct ParquetReadOptions<'a> {
pub schema: Option<&'a Schema>,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
- /// Setting controls how inserts to this file should be handled
- pub insert_mode: ListingTableInsertMode,
}
impl<'a> Default for ParquetReadOptions<'a> {
@@ -232,7 +221,6 @@ impl<'a> Default for ParquetReadOptions<'a> {
skip_metadata: None,
schema: None,
file_sort_order: vec![],
- insert_mode: ListingTableInsertMode::AppendNewFiles,
}
}
}
@@ -272,12 +260,6 @@ impl<'a> ParquetReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}
-
- /// Configure how insertions to this table should be handled
- pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
- self.insert_mode = insert_mode;
- self
- }
}
/// Options that control the reading of ARROW files.
@@ -403,8 +385,6 @@ pub struct NdJsonReadOptions<'a> {
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<Expr>>,
- /// Setting controls how inserts to this file should be handled
- pub insert_mode: ListingTableInsertMode,
}
impl<'a> Default for NdJsonReadOptions<'a> {
@@ -417,7 +397,6 @@ impl<'a> Default for NdJsonReadOptions<'a> {
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
- insert_mode: ListingTableInsertMode::AppendToFile,
}
}
}
@@ -464,12 +443,6 @@ impl<'a> NdJsonReadOptions<'a> {
self.file_sort_order = file_sort_order;
self
}
-
- /// Configure how insertions to this table should be handled
- pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
- self.insert_mode = insert_mode;
- self
- }
}
#[async_trait]
@@ -528,7 +501,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
.with_infinite_source(self.infinite)
- .with_insert_mode(self.insert_mode.clone())
}
async fn get_resolved_schema(
@@ -555,7 +527,6 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
.with_target_partitions(config.target_partitions())
.with_table_partition_cols(self.table_partition_cols.clone())
.with_file_sort_order(self.file_sort_order.clone())
- .with_insert_mode(self.insert_mode.clone())
}
async fn get_resolved_schema(
@@ -582,7 +553,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
.with_table_partition_cols(self.table_partition_cols.clone())
.with_infinite_source(self.infinite)
.with_file_sort_order(self.file_sort_order.clone())
- .with_insert_mode(self.insert_mode.clone())
}
async fn get_resolved_schema(
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs
b/datafusion/core/src/datasource/file_format/parquet.rs
index 2cba474e55..c4d05adfc6 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -40,11 +40,12 @@ use crate::datasource::statistics::{create_max_min_accs,
get_col_stats};
use arrow::datatypes::SchemaRef;
use arrow::datatypes::{Fields, Schema};
use bytes::{BufMut, BytesMut};
-use datafusion_common::{exec_err, not_impl_err, plan_err, DataFusionError,
FileType};
+use datafusion_common::{exec_err, not_impl_err, DataFusionError, FileType};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortRequirement};
use futures::{StreamExt, TryStreamExt};
use hashbrown::HashMap;
+use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::{
arrow_to_parquet_schema, parquet_to_arrow_schema, AsyncArrowWriter,
@@ -55,7 +56,7 @@ use parquet::file::properties::WriterProperties;
use parquet::file::statistics::Statistics as ParquetStatistics;
use super::write::demux::start_demuxer_task;
-use super::write::{create_writer, AbortableWrite, FileWriterMode};
+use super::write::{create_writer, AbortableWrite};
use super::{FileFormat, FileScanConfig};
use crate::arrow::array::{
BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
@@ -64,7 +65,7 @@ use crate::arrow::datatypes::DataType;
use crate::config::ConfigOptions;
use crate::datasource::physical_plan::{
- FileGroupDisplay, FileMeta, FileSinkConfig, ParquetExec, SchemaAdapter,
+ FileGroupDisplay, FileSinkConfig, ParquetExec, SchemaAdapter,
};
use crate::error::Result;
use crate::execution::context::SessionState;
@@ -596,11 +597,7 @@ impl DisplayAs for ParquetSink {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
- write!(
- f,
- "ParquetSink(writer_mode={:?}, file_groups=",
- self.config.writer_mode
- )?;
+ write!(f, "ParquetSink(file_groups=",)?;
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
write!(f, ")")
}
@@ -642,36 +639,23 @@ impl ParquetSink {
/// AsyncArrowWriters are used when individual parquet file serialization
is not parallelized
async fn create_async_arrow_writer(
&self,
- file_meta: FileMeta,
+ location: &Path,
object_store: Arc<dyn ObjectStore>,
parquet_props: WriterProperties,
) -> Result<
AsyncArrowWriter<Box<dyn tokio::io::AsyncWrite + std::marker::Send +
Unpin>>,
> {
- let object = &file_meta.object_meta;
- match self.config.writer_mode {
- FileWriterMode::Append => {
- plan_err!(
- "Appending to Parquet files is not supported by the file
format!"
- )
- }
- FileWriterMode::Put => {
- not_impl_err!("FileWriterMode::Put is not implemented for
ParquetSink")
- }
- FileWriterMode::PutMultipart => {
- let (_, multipart_writer) = object_store
- .put_multipart(&object.location)
- .await
- .map_err(DataFusionError::ObjectStore)?;
- let writer = AsyncArrowWriter::try_new(
- multipart_writer,
- self.get_writer_schema(),
- 10485760,
- Some(parquet_props),
- )?;
- Ok(writer)
- }
- }
+ let (_, multipart_writer) = object_store
+ .put_multipart(location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ let writer = AsyncArrowWriter::try_new(
+ multipart_writer,
+ self.get_writer_schema(),
+ 10485760,
+ Some(parquet_props),
+ )?;
+ Ok(writer)
}
}
@@ -730,13 +714,7 @@ impl DataSink for ParquetSink {
if !allow_single_file_parallelism {
let mut writer = self
.create_async_arrow_writer(
- ObjectMeta {
- location: path,
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- }
- .into(),
+ &path,
object_store.clone(),
parquet_props.clone(),
)
@@ -752,17 +730,10 @@ impl DataSink for ParquetSink {
});
} else {
let writer = create_writer(
- FileWriterMode::PutMultipart,
// Parquet files as a whole are never compressed, since
they
// manage compressed blocks themselves.
FileCompressionType::UNCOMPRESSED,
- ObjectMeta {
- location: path,
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- }
- .into(),
+ &path,
object_store.clone(),
)
.await?;
diff --git a/datafusion/core/src/datasource/file_format/write/mod.rs
b/datafusion/core/src/datasource/file_format/write/mod.rs
index 770c7a49c3..cfcdbd8c46 100644
--- a/datafusion/core/src/datasource/file_format/write/mod.rs
+++ b/datafusion/core/src/datasource/file_format/write/mod.rs
@@ -19,128 +19,32 @@
//! write support for the various file formats
use std::io::Error;
-use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::physical_plan::FileMeta;
use crate::error::Result;
use arrow_array::RecordBatch;
-use datafusion_common::{exec_err, DataFusionError};
+use datafusion_common::DataFusionError;
use async_trait::async_trait;
use bytes::Bytes;
use futures::future::BoxFuture;
-use futures::ready;
-use futures::FutureExt;
use object_store::path::Path;
-use object_store::{MultipartId, ObjectMeta, ObjectStore};
+use object_store::{MultipartId, ObjectStore};
use tokio::io::AsyncWrite;
pub(crate) mod demux;
pub(crate) mod orchestration;
-/// `AsyncPutWriter` is an object that facilitates asynchronous writing to
object stores.
-/// It is specifically designed for the `object_store` crate's `put` method
and sends
-/// whole bytes at once when the buffer is flushed.
-pub struct AsyncPutWriter {
- /// Object metadata
- object_meta: ObjectMeta,
- /// A shared reference to the object store
- store: Arc<dyn ObjectStore>,
- /// A buffer that stores the bytes to be sent
- current_buffer: Vec<u8>,
- /// Used for async handling in flush method
- inner_state: AsyncPutState,
-}
-
-impl AsyncPutWriter {
- /// Constructor for the `AsyncPutWriter` object
- pub fn new(object_meta: ObjectMeta, store: Arc<dyn ObjectStore>) -> Self {
- Self {
- object_meta,
- store,
- current_buffer: vec![],
- // The writer starts out in buffering mode
- inner_state: AsyncPutState::Buffer,
- }
- }
-
- /// Separate implementation function that unpins the [`AsyncPutWriter`] so
- /// that partial borrows work correctly
- fn poll_shutdown_inner(
- &mut self,
- cx: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- loop {
- match &mut self.inner_state {
- AsyncPutState::Buffer => {
- // Convert the current buffer to bytes and take ownership
of it
- let bytes = Bytes::from(mem::take(&mut
self.current_buffer));
- // Set the inner state to Put variant with the bytes
- self.inner_state = AsyncPutState::Put { bytes }
- }
- AsyncPutState::Put { bytes } => {
- // Send the bytes to the object store's put method
- return Poll::Ready(
- ready!(self
- .store
- .put(&self.object_meta.location, bytes.clone())
- .poll_unpin(cx))
- .map_err(Error::from),
- );
- }
- }
- }
- }
-}
-
-/// An enum that represents the inner state of AsyncPut
-enum AsyncPutState {
- /// Building Bytes struct in this state
- Buffer,
- /// Data in the buffer is being sent to the object store
- Put { bytes: Bytes },
-}
-
-impl AsyncWrite for AsyncPutWriter {
- // Define the implementation of the AsyncWrite trait for the
`AsyncPutWriter` struct
- fn poll_write(
- mut self: Pin<&mut Self>,
- _: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<std::result::Result<usize, Error>> {
- // Extend the current buffer with the incoming buffer
- self.current_buffer.extend_from_slice(buf);
- // Return a ready poll with the length of the incoming buffer
- Poll::Ready(Ok(buf.len()))
- }
-
- fn poll_flush(
- self: Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- // Return a ready poll with an empty result
- Poll::Ready(Ok(()))
- }
-
- fn poll_shutdown(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- ) -> Poll<std::result::Result<(), Error>> {
- // Call the poll_shutdown_inner method to handle the actual sending of
data to the object store
- self.poll_shutdown_inner(cx)
- }
-}
-
/// Stores data needed during abortion of MultiPart writers
+#[derive(Clone)]
pub(crate) struct MultiPart {
/// A shared reference to the object store
store: Arc<dyn ObjectStore>,
@@ -163,45 +67,28 @@ impl MultiPart {
}
}
-pub(crate) enum AbortMode {
- Put,
- Append,
- MultiPart(MultiPart),
-}
-
/// A wrapper struct with abort method and writer
pub(crate) struct AbortableWrite<W: AsyncWrite + Unpin + Send> {
writer: W,
- mode: AbortMode,
+ multipart: MultiPart,
}
impl<W: AsyncWrite + Unpin + Send> AbortableWrite<W> {
/// Create a new `AbortableWrite` instance with the given writer, and
write mode.
- pub(crate) fn new(writer: W, mode: AbortMode) -> Self {
- Self { writer, mode }
+ pub(crate) fn new(writer: W, multipart: MultiPart) -> Self {
+ Self { writer, multipart }
}
/// handling of abort for different write modes
pub(crate) fn abort_writer(&self) -> Result<BoxFuture<'static,
Result<()>>> {
- match &self.mode {
- AbortMode::Put => Ok(async { Ok(()) }.boxed()),
- AbortMode::Append => exec_err!("Cannot abort in append mode"),
- AbortMode::MultiPart(MultiPart {
- store,
- multipart_id,
- location,
- }) => {
- let location = location.clone();
- let multipart_id = multipart_id.clone();
- let store = store.clone();
- Ok(Box::pin(async move {
- store
- .abort_multipart(&location, &multipart_id)
- .await
- .map_err(DataFusionError::ObjectStore)
- }))
- }
- }
+ let multi = self.multipart.clone();
+ Ok(Box::pin(async move {
+ multi
+ .store
+ .abort_multipart(&multi.location, &multi.multipart_id)
+ .await
+ .map_err(DataFusionError::ObjectStore)
+ }))
}
}
@@ -229,16 +116,6 @@ impl<W: AsyncWrite + Unpin + Send> AsyncWrite for
AbortableWrite<W> {
}
}
-/// An enum that defines different file writer modes.
-#[derive(Debug, Clone, Copy)]
-pub enum FileWriterMode {
- /// Data is appended to an existing file.
- Append,
- /// Data is written to a new file.
- Put,
- /// Data is written to a new file in multiple parts.
- PutMultipart,
-}
/// A trait that defines the methods required for a RecordBatch serializer.
#[async_trait]
pub trait BatchSerializer: Unpin + Send {
@@ -255,51 +132,16 @@ pub trait BatchSerializer: Unpin + Send {
/// Returns an [`AbortableWrite`] which writes to the given object store
location
/// with the specified compression
pub(crate) async fn create_writer(
- writer_mode: FileWriterMode,
file_compression_type: FileCompressionType,
- file_meta: FileMeta,
+ location: &Path,
object_store: Arc<dyn ObjectStore>,
) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
- let object = &file_meta.object_meta;
- match writer_mode {
- // If the mode is append, call the store's append method and return
wrapped in
- // a boxed trait object.
- FileWriterMode::Append => {
- let writer = object_store
- .append(&object.location)
- .await
- .map_err(DataFusionError::ObjectStore)?;
- let writer = AbortableWrite::new(
- file_compression_type.convert_async_writer(writer)?,
- AbortMode::Append,
- );
- Ok(writer)
- }
- // If the mode is put, create a new AsyncPut writer and return it
wrapped in
- // a boxed trait object
- FileWriterMode::Put => {
- let writer = Box::new(AsyncPutWriter::new(object.clone(),
object_store));
- let writer = AbortableWrite::new(
- file_compression_type.convert_async_writer(writer)?,
- AbortMode::Put,
- );
- Ok(writer)
- }
- // If the mode is put multipart, call the store's put_multipart method
and
- // return the writer wrapped in a boxed trait object.
- FileWriterMode::PutMultipart => {
- let (multipart_id, writer) = object_store
- .put_multipart(&object.location)
- .await
- .map_err(DataFusionError::ObjectStore)?;
- Ok(AbortableWrite::new(
- file_compression_type.convert_async_writer(writer)?,
- AbortMode::MultiPart(MultiPart::new(
- object_store,
- multipart_id,
- object.location.clone(),
- )),
- ))
- }
- }
+ let (multipart_id, writer) = object_store
+ .put_multipart(location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ Ok(AbortableWrite::new(
+ file_compression_type.convert_async_writer(writer)?,
+ MultiPart::new(object_store, multipart_id, location.clone()),
+ ))
}
diff --git a/datafusion/core/src/datasource/file_format/write/orchestration.rs
b/datafusion/core/src/datasource/file_format/write/orchestration.rs
index f84baa9ac2..2ae6b70ed1 100644
--- a/datafusion/core/src/datasource/file_format/write/orchestration.rs
+++ b/datafusion/core/src/datasource/file_format/write/orchestration.rs
@@ -22,7 +22,6 @@
use std::sync::Arc;
use crate::datasource::file_format::file_compression_type::FileCompressionType;
-use crate::datasource::listing::PartitionedFile;
use crate::datasource::physical_plan::FileSinkConfig;
use crate::error::Result;
use crate::physical_plan::SendableRecordBatchStream;
@@ -34,17 +33,13 @@ use datafusion_common::DataFusionError;
use bytes::Bytes;
use datafusion_execution::TaskContext;
-use futures::StreamExt;
-
-use object_store::{ObjectMeta, ObjectStore};
-
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver};
use tokio::task::{JoinHandle, JoinSet};
use tokio::try_join;
use super::demux::start_demuxer_task;
-use super::{create_writer, AbortableWrite, BatchSerializer, FileWriterMode};
+use super::{create_writer, AbortableWrite, BatchSerializer};
type WriterType = AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>;
type SerializerType = Box<dyn BatchSerializer>;
@@ -274,21 +269,9 @@ pub(crate) async fn stateless_multipart_put(
stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt,
unbounded_input)
.await
});
- while let Some((output_location, rb_stream)) = file_stream_rx.recv().await
{
+ while let Some((location, rb_stream)) = file_stream_rx.recv().await {
let serializer = get_serializer();
- let object_meta = ObjectMeta {
- location: output_location,
- last_modified: chrono::offset::Utc::now(),
- size: 0,
- e_tag: None,
- };
- let writer = create_writer(
- FileWriterMode::PutMultipart,
- compression,
- object_meta.into(),
- object_store.clone(),
- )
- .await?;
+ let writer = create_writer(compression, &location,
object_store.clone()).await?;
tx_file_bundle
.send((rb_stream, serializer, writer))
@@ -325,91 +308,3 @@ pub(crate) async fn stateless_multipart_put(
Ok(total_count)
}
-
-/// Orchestrates append_all for any statelessly serialized file type. Appends
to all files provided
-/// in a round robin fashion.
-pub(crate) async fn stateless_append_all(
- mut data: SendableRecordBatchStream,
- context: &Arc<TaskContext>,
- object_store: Arc<dyn ObjectStore>,
- file_groups: &Vec<PartitionedFile>,
- unbounded_input: bool,
- compression: FileCompressionType,
- get_serializer: Box<dyn Fn(usize) -> Box<dyn BatchSerializer> + Send>,
-) -> Result<u64> {
- let rb_buffer_size = &context
- .session_config()
- .options()
- .execution
- .max_buffered_batches_per_output_file;
-
- let (tx_file_bundle, rx_file_bundle) =
tokio::sync::mpsc::channel(file_groups.len());
- let mut send_channels = vec![];
- for file_group in file_groups {
- let serializer = get_serializer(file_group.object_meta.size);
-
- let file = file_group.clone();
- let writer = create_writer(
- FileWriterMode::Append,
- compression,
- file.object_meta.clone().into(),
- object_store.clone(),
- )
- .await?;
-
- let (tx, rx) = tokio::sync::mpsc::channel(rb_buffer_size / 2);
- send_channels.push(tx);
- tx_file_bundle
- .send((rx, serializer, writer))
- .await
- .map_err(|_| {
- DataFusionError::Internal(
- "Writer receive file bundle channel closed
unexpectedly!".into(),
- )
- })?;
- }
-
- let (tx_row_cnt, rx_row_cnt) = tokio::sync::oneshot::channel();
- let write_coordinater_task = tokio::spawn(async move {
- stateless_serialize_and_write_files(rx_file_bundle, tx_row_cnt,
unbounded_input)
- .await
- });
-
- // Append to file groups in round robin
- let mut next_file_idx = 0;
- while let Some(rb) = data.next().await.transpose()? {
- send_channels[next_file_idx].send(rb).await.map_err(|_| {
- DataFusionError::Internal(
- "Recordbatch file append stream closed unexpectedly!".into(),
- )
- })?;
- next_file_idx = (next_file_idx + 1) % send_channels.len();
- if unbounded_input {
- tokio::task::yield_now().await;
- }
- }
- // Signal to the write coordinater that no more files are coming
- drop(tx_file_bundle);
- drop(send_channels);
-
- let total_count = rx_row_cnt.await.map_err(|_| {
- DataFusionError::Internal(
- "Did not receieve row count from write coordinater".into(),
- )
- })?;
-
- match try_join!(write_coordinater_task) {
- Ok(r1) => {
- r1.0?;
- }
- Err(e) => {
- if e.is_panic() {
- std::panic::resume_unwind(e.into_panic());
- } else {
- unreachable!();
- }
- }
- }
-
- Ok(total_count)
-}
diff --git a/datafusion/core/src/datasource/listing/mod.rs
b/datafusion/core/src/datasource/listing/mod.rs
index 8b0f021f02..aa2e20164b 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -31,9 +31,7 @@ use std::pin::Pin;
use std::sync::Arc;
pub use self::url::ListingTableUrl;
-pub use table::{
- ListingOptions, ListingTable, ListingTableConfig, ListingTableInsertMode,
-};
+pub use table::{ListingOptions, ListingTable, ListingTableConfig};
/// Stream of files get listed from object store
pub type PartitionedFileStream =
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index c22eb58e88..515bc8a9e6 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -214,33 +214,6 @@ impl ListingTableConfig {
}
}
-#[derive(Debug, Clone)]
-///controls how new data should be inserted to a ListingTable
-pub enum ListingTableInsertMode {
- ///Data should be appended to an existing file
- AppendToFile,
- ///Data is appended as new files in existing TablePaths
- AppendNewFiles,
- ///Throw an error if insert into is attempted on this table
- Error,
-}
-
-impl FromStr for ListingTableInsertMode {
- type Err = DataFusionError;
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- let s_lower = s.to_lowercase();
- match s_lower.as_str() {
- "append_to_file" => Ok(ListingTableInsertMode::AppendToFile),
- "append_new_files" => Ok(ListingTableInsertMode::AppendNewFiles),
- "error" => Ok(ListingTableInsertMode::Error),
- _ => plan_err!(
- "Unknown or unsupported insert mode {s}. Supported options are
\
- append_to_file, append_new_files, and error."
- ),
- }
- }
-}
-
/// Options for creating a [`ListingTable`]
#[derive(Clone, Debug)]
pub struct ListingOptions {
@@ -279,8 +252,6 @@ pub struct ListingOptions {
/// In order to support infinite inputs, DataFusion may adjust query
/// plans (e.g. joins) to run the given query in full pipelining mode.
pub infinite_source: bool,
- /// This setting controls how inserts to this table should be handled
- pub insert_mode: ListingTableInsertMode,
/// This setting when true indicates that the table is backed by a single
file.
/// Any inserts to the table may only append to this existing file.
pub single_file: bool,
@@ -305,7 +276,6 @@ impl ListingOptions {
target_partitions: 1,
file_sort_order: vec![],
infinite_source: false,
- insert_mode: ListingTableInsertMode::AppendToFile,
single_file: false,
file_type_write_options: None,
}
@@ -476,12 +446,6 @@ impl ListingOptions {
self
}
- /// Configure how insertions to this table should be handled.
- pub fn with_insert_mode(mut self, insert_mode: ListingTableInsertMode) ->
Self {
- self.insert_mode = insert_mode;
- self
- }
-
/// Configure if this table is backed by a sigle file
pub fn with_single_file(mut self, single_file: bool) -> Self {
self.single_file = single_file;
@@ -806,6 +770,13 @@ impl TableProvider for ListingTable {
}
let table_path = &self.table_paths()[0];
+ if !table_path.is_collection() {
+ return plan_err!(
+ "Inserting into a ListingTable backed by a single file is not
supported, URL is possibly missing a trailing `/`. \
+ To append to an existing file use StreamTable, e.g. by using
CREATE UNBOUNDED EXTERNAL TABLE"
+ );
+ }
+
// Get the object store for the table path.
let store = state.runtime_env().object_store(table_path)?;
@@ -820,31 +791,6 @@ impl TableProvider for ListingTable {
.await?;
let file_groups = file_list_stream.try_collect::<Vec<_>>().await?;
- //if we are writing a single output_partition to a table backed by a
single file
- //we can append to that file. Otherwise, we can write new files into
the directory
- //adding new files to the listing table in order to insert to the
table.
- let input_partitions = input.output_partitioning().partition_count();
- let writer_mode = match self.options.insert_mode {
- ListingTableInsertMode::AppendToFile => {
- if input_partitions > file_groups.len() {
- return plan_err!(
- "Cannot append {input_partitions} partitions to {}
files!",
- file_groups.len()
- );
- }
-
- crate::datasource::file_format::write::FileWriterMode::Append
- }
- ListingTableInsertMode::AppendNewFiles => {
-
crate::datasource::file_format::write::FileWriterMode::PutMultipart
- }
- ListingTableInsertMode::Error => {
- return plan_err!(
- "Invalid plan attempting write to table with
TableWriteMode::Error!"
- );
- }
- };
-
let file_format = self.options().format.as_ref();
let file_type_writer_options = match
&self.options().file_type_write_options {
@@ -862,7 +808,6 @@ impl TableProvider for ListingTable {
file_groups,
output_schema: self.schema(),
table_partition_cols: self.options.table_partition_cols.clone(),
- writer_mode,
// A plan can produce finite number of rows even if it has
unbounded sources, like LIMIT
// queries. Thus, we can check if the plan is streaming to ensure
file sink input is
// unbounded. When `unbounded_input` flag is `true` for sink, we
occasionally call `yield_now`
@@ -877,14 +822,6 @@ impl TableProvider for ListingTable {
let unsorted: Vec<Vec<Expr>> = vec![];
let order_requirements = if self.options().file_sort_order != unsorted
{
- if matches!(
- self.options().insert_mode,
- ListingTableInsertMode::AppendToFile
- ) {
- return plan_err!(
- "Cannot insert into a sorted ListingTable with mode
append!"
- );
- }
// Multiple sort orders in outer vec are equivalent, so we pass
only the first one
let ordering = self
.try_create_output_ordering()?
@@ -1003,7 +940,7 @@ mod tests {
use crate::prelude::*;
use crate::{
assert_batches_eq,
- datasource::file_format::{avro::AvroFormat,
file_compression_type::FileTypeExt},
+ datasource::file_format::avro::AvroFormat,
execution::options::ReadOptions,
logical_expr::{col, lit},
test::{columns, object_store::register_test_store},
@@ -1567,17 +1504,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_insert_into_append_to_json_file() -> Result<()> {
- helper_test_insert_into_append_to_existing_files(
- FileType::JSON,
- FileCompressionType::UNCOMPRESSED,
- None,
- )
- .await?;
- Ok(())
- }
-
#[tokio::test]
async fn test_insert_into_append_new_json_files() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
@@ -1596,17 +1522,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_insert_into_append_to_csv_file() -> Result<()> {
- helper_test_insert_into_append_to_existing_files(
- FileType::CSV,
- FileCompressionType::UNCOMPRESSED,
- None,
- )
- .await?;
- Ok(())
- }
-
#[tokio::test]
async fn test_insert_into_append_new_csv_files() -> Result<()> {
let mut config_map: HashMap<String, String> = HashMap::new();
@@ -1663,13 +1578,8 @@ mod tests {
#[tokio::test]
async fn test_insert_into_sql_csv_defaults() -> Result<()> {
- helper_test_insert_into_sql(
- "csv",
- FileCompressionType::UNCOMPRESSED,
- "OPTIONS (insert_mode 'append_new_files')",
- None,
- )
- .await?;
+ helper_test_insert_into_sql("csv", FileCompressionType::UNCOMPRESSED,
"", None)
+ .await?;
Ok(())
}
@@ -1678,8 +1588,7 @@ mod tests {
helper_test_insert_into_sql(
"csv",
FileCompressionType::UNCOMPRESSED,
- "WITH HEADER ROW \
- OPTIONS (insert_mode 'append_new_files')",
+ "WITH HEADER ROW",
None,
)
.await?;
@@ -1688,13 +1597,8 @@ mod tests {
#[tokio::test]
async fn test_insert_into_sql_json_defaults() -> Result<()> {
- helper_test_insert_into_sql(
- "json",
- FileCompressionType::UNCOMPRESSED,
- "OPTIONS (insert_mode 'append_new_files')",
- None,
- )
- .await?;
+ helper_test_insert_into_sql("json", FileCompressionType::UNCOMPRESSED,
"", None)
+ .await?;
Ok(())
}
@@ -1879,211 +1783,6 @@ mod tests {
Ok(())
}
- #[tokio::test]
- async fn test_insert_into_append_to_parquet_file_fails() -> Result<()> {
- let maybe_err = helper_test_insert_into_append_to_existing_files(
- FileType::PARQUET,
- FileCompressionType::UNCOMPRESSED,
- None,
- )
- .await;
- let _err =
- maybe_err.expect_err("Appending to existing parquet file did not
fail!");
- Ok(())
- }
-
- fn load_empty_schema_table(
- schema: SchemaRef,
- temp_path: &str,
- insert_mode: ListingTableInsertMode,
- file_format: Arc<dyn FileFormat>,
- ) -> Result<Arc<dyn TableProvider>> {
- File::create(temp_path)?;
- let table_path = ListingTableUrl::parse(temp_path).unwrap();
-
- let listing_options =
-
ListingOptions::new(file_format.clone()).with_insert_mode(insert_mode);
-
- let config = ListingTableConfig::new(table_path)
- .with_listing_options(listing_options)
- .with_schema(schema);
-
- let table = ListingTable::try_new(config)?;
- Ok(Arc::new(table))
- }
-
- /// Logic of testing inserting into listing table by Appending to existing
files
- /// is the same for all formats/options which support this. This helper
allows
- /// passing different options to execute the same test with different
settings.
- async fn helper_test_insert_into_append_to_existing_files(
- file_type: FileType,
- file_compression_type: FileCompressionType,
- session_config_map: Option<HashMap<String, String>>,
- ) -> Result<()> {
- // Create the initial context, schema, and batch.
- let session_ctx = match session_config_map {
- Some(cfg) => {
- let config = SessionConfig::from_string_hash_map(cfg)?;
- SessionContext::new_with_config(config)
- }
- None => SessionContext::new(),
- };
- // Create a new schema with one field called "a" of type Int32
- let schema = Arc::new(Schema::new(vec![Field::new(
- "column1",
- DataType::Int32,
- false,
- )]));
-
- // Create a new batch of data to insert into the table
- let batch = RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
- )?;
-
- // Filename with extension
- let filename = format!(
- "path{}",
- file_type
- .to_owned()
- .get_ext_with_compression(file_compression_type)
- .unwrap()
- );
-
- // Create a temporary directory and a CSV file within it.
- let tmp_dir = TempDir::new()?;
- let path = tmp_dir.path().join(filename);
-
- let file_format: Arc<dyn FileFormat> = match file_type {
- FileType::CSV => Arc::new(
-
CsvFormat::default().with_file_compression_type(file_compression_type),
- ),
- FileType::JSON => Arc::new(
-
JsonFormat::default().with_file_compression_type(file_compression_type),
- ),
- FileType::PARQUET => Arc::new(ParquetFormat::default()),
- FileType::AVRO => Arc::new(AvroFormat {}),
- FileType::ARROW => Arc::new(ArrowFormat {}),
- };
-
- let initial_table = load_empty_schema_table(
- schema.clone(),
- path.to_str().unwrap(),
- ListingTableInsertMode::AppendToFile,
- file_format,
- )?;
- session_ctx.register_table("t", initial_table)?;
- // Create and register the source table with the provided schema and
inserted data
- let source_table = Arc::new(MemTable::try_new(
- schema.clone(),
- vec![vec![batch.clone(), batch.clone()]],
- )?);
- session_ctx.register_table("source", source_table.clone())?;
- // Convert the source table into a provider so that it can be used in
a query
- let source = provider_as_source(source_table);
- // Create a table scan logical plan to read from the source table
- let scan_plan = LogicalPlanBuilder::scan("source", source,
None)?.build()?;
- // Create an insert plan to insert the source data into the initial
table
- let insert_into_table =
- LogicalPlanBuilder::insert_into(scan_plan, "t", &schema,
false)?.build()?;
- // Create a physical plan from the insert plan
- let plan = session_ctx
- .state()
- .create_physical_plan(&insert_into_table)
- .await?;
-
- // Execute the physical plan and collect the results
- let res = collect(plan, session_ctx.task_ctx()).await?;
- // Insert returns the number of rows written, in our case this would
be 6.
- let expected = [
- "+-------+",
- "| count |",
- "+-------+",
- "| 6 |",
- "+-------+",
- ];
-
- // Assert that the batches read from the file match the expected
result.
- assert_batches_eq!(expected, &res);
-
- // Read the records in the table
- let batches = session_ctx.sql("select * from
t").await?.collect().await?;
-
- // Define the expected result as a vector of strings.
- let expected = [
- "+---------+",
- "| column1 |",
- "+---------+",
- "| 1 |",
- "| 2 |",
- "| 3 |",
- "| 1 |",
- "| 2 |",
- "| 3 |",
- "+---------+",
- ];
-
- // Assert that the batches read from the file match the expected
result.
- assert_batches_eq!(expected, &batches);
-
- // Assert that only 1 file was added to the table
- let num_files = tmp_dir.path().read_dir()?.count();
- assert_eq!(num_files, 1);
-
- // Create a physical plan from the insert plan
- let plan = session_ctx
- .state()
- .create_physical_plan(&insert_into_table)
- .await?;
-
- // Again, execute the physical plan and collect the results
- let res = collect(plan, session_ctx.task_ctx()).await?;
- // Insert returns the number of rows written, in our case this would
be 6.
- let expected = [
- "+-------+",
- "| count |",
- "+-------+",
- "| 6 |",
- "+-------+",
- ];
-
- // Assert that the batches read from the file match the expected
result.
- assert_batches_eq!(expected, &res);
-
- // Open the CSV file, read its contents as a record batch, and collect
the batches into a vector.
- let batches = session_ctx.sql("select * from
t").await?.collect().await?;
-
- // Define the expected result after the second append.
- let expected = vec![
- "+---------+",
- "| column1 |",
- "+---------+",
- "| 1 |",
- "| 2 |",
- "| 3 |",
- "| 1 |",
- "| 2 |",
- "| 3 |",
- "| 1 |",
- "| 2 |",
- "| 3 |",
- "| 1 |",
- "| 2 |",
- "| 3 |",
- "+---------+",
- ];
-
- // Assert that the batches read from the file after the second append
match the expected result.
- assert_batches_eq!(expected, &batches);
-
- // Assert that no additional files were added to the table
- let num_files = tmp_dir.path().read_dir()?.count();
- assert_eq!(num_files, 1);
-
- // Return Ok if the function
- Ok(())
- }
-
async fn helper_test_append_new_files_to_table(
file_type: FileType,
file_compression_type: FileCompressionType,
@@ -2129,7 +1828,6 @@ mod tests {
"t",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new()
-
.insert_mode(ListingTableInsertMode::AppendNewFiles)
.schema(schema.as_ref())
.file_compression_type(file_compression_type),
)
@@ -2141,7 +1839,6 @@ mod tests {
"t",
tmp_dir.path().to_str().unwrap(),
NdJsonReadOptions::default()
-
.insert_mode(ListingTableInsertMode::AppendNewFiles)
.schema(schema.as_ref())
.file_compression_type(file_compression_type),
)
@@ -2152,9 +1849,7 @@ mod tests {
.register_parquet(
"t",
tmp_dir.path().to_str().unwrap(),
- ParquetReadOptions::default()
-
.insert_mode(ListingTableInsertMode::AppendNewFiles)
- .schema(schema.as_ref()),
+ ParquetReadOptions::default().schema(schema.as_ref()),
)
.await?;
}
@@ -2163,10 +1858,7 @@ mod tests {
.register_avro(
"t",
tmp_dir.path().to_str().unwrap(),
- AvroReadOptions::default()
- // TODO implement insert_mode for avro
-
//.insert_mode(ListingTableInsertMode::AppendNewFiles)
- .schema(schema.as_ref()),
+ AvroReadOptions::default().schema(schema.as_ref()),
)
.await?;
}
@@ -2175,10 +1867,7 @@ mod tests {
.register_arrow(
"t",
tmp_dir.path().to_str().unwrap(),
- ArrowReadOptions::default()
- // TODO implement insert_mode for arrow
-
//.insert_mode(ListingTableInsertMode::AppendNewFiles)
- .schema(schema.as_ref()),
+ ArrowReadOptions::default().schema(schema.as_ref()),
)
.await?;
}
diff --git a/datafusion/core/src/datasource/listing/url.rs
b/datafusion/core/src/datasource/listing/url.rs
index 9197e37adb..ba3c3fae21 100644
--- a/datafusion/core/src/datasource/listing/url.rs
+++ b/datafusion/core/src/datasource/listing/url.rs
@@ -181,6 +181,11 @@ impl ListingTableUrl {
}
}
+ /// Returns `true` if `path` refers to a collection of objects
+ pub fn is_collection(&self) -> bool {
+ self.url.as_str().ends_with('/')
+ }
+
/// Strips the prefix of this [`ListingTableUrl`] from the provided path,
returning
/// an iterator of the remaining path segments
pub(crate) fn strip_prefix<'a, 'b: 'a>(
@@ -203,8 +208,7 @@ impl ListingTableUrl {
file_extension: &'a str,
) -> Result<BoxStream<'a, Result<ObjectMeta>>> {
// If the prefix is a file, use a head request, otherwise list
- let is_dir = self.url.as_str().ends_with('/');
- let list = match is_dir {
+ let list = match self.is_collection() {
true => match
ctx.runtime_env().cache_manager.get_list_files_cache() {
None => futures::stream::once(store.list(Some(&self.prefix)))
.try_flatten()
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs
b/datafusion/core/src/datasource/listing_table_factory.rs
index f9a7ab04ce..543a3a83f7 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -21,8 +21,6 @@ use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
-use super::listing::ListingTableInsertMode;
-
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::file_format::{
@@ -38,7 +36,7 @@ use crate::execution::context::SessionState;
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::file_options::{FileTypeWriterOptions, StatementOptions};
-use datafusion_common::{DataFusionError, FileType};
+use datafusion_common::{plan_err, DataFusionError, FileType};
use datafusion_expr::CreateExternalTable;
use async_trait::async_trait;
@@ -149,19 +147,12 @@ impl TableProviderFactory for ListingTableFactory {
.take_bool_option("single_file")?
.unwrap_or(false);
- let explicit_insert_mode =
statement_options.take_str_option("insert_mode");
- let insert_mode = match explicit_insert_mode {
- Some(mode) => ListingTableInsertMode::from_str(mode.as_str()),
- None => match file_type {
- FileType::CSV => Ok(ListingTableInsertMode::AppendToFile),
- #[cfg(feature = "parquet")]
- FileType::PARQUET =>
Ok(ListingTableInsertMode::AppendNewFiles),
- FileType::AVRO => Ok(ListingTableInsertMode::AppendNewFiles),
- FileType::JSON => Ok(ListingTableInsertMode::AppendToFile),
- FileType::ARROW => Ok(ListingTableInsertMode::AppendNewFiles),
- },
- }?;
-
+ // Backwards compatibility
+ if let Some(s) = statement_options.take_str_option("insert_mode") {
+ if !s.eq_ignore_ascii_case("append_new_files") {
+ return plan_err!("Unknown or unsupported insert mode {s}. Only
append_to_file supported");
+ }
+ }
let file_type = file_format.file_type();
// Use remaining options and session state to build
FileTypeWriterOptions
@@ -214,7 +205,6 @@ impl TableProviderFactory for ListingTableFactory {
.with_target_partitions(state.config().target_partitions())
.with_table_partition_cols(table_partition_cols)
.with_file_sort_order(cmd.order_exprs.clone())
- .with_insert_mode(insert_mode)
.with_single_file(single_file)
.with_write_options(file_type_writer_options)
.with_infinite_source(unbounded);
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index ea0a9698ff..738e70966b 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -49,10 +49,7 @@ use std::{
use super::listing::ListingTableUrl;
use crate::error::{DataFusionError, Result};
-use crate::{
- datasource::file_format::write::FileWriterMode,
- physical_plan::{DisplayAs, DisplayFormatType},
-};
+use crate::physical_plan::{DisplayAs, DisplayFormatType};
use crate::{
datasource::{
listing::{FileRange, PartitionedFile},
@@ -90,8 +87,6 @@ pub struct FileSinkConfig {
/// A vector of column names and their corresponding data types,
/// representing the partitioning columns for the file
pub table_partition_cols: Vec<(String, DataType)>,
- /// A writer mode that determines how data is written to the file
- pub writer_mode: FileWriterMode,
/// If true, it is assumed there is a single table_path which is a file to
which all data should be written
/// regardless of input partitioning. Otherwise, each table path is
assumed to be a directory
/// to which each output partition is written to its own output file.
diff --git a/datafusion/core/src/datasource/stream.rs
b/datafusion/core/src/datasource/stream.rs
index fc19ff954d..6965968b6f 100644
--- a/datafusion/core/src/datasource/stream.rs
+++ b/datafusion/core/src/datasource/stream.rs
@@ -171,7 +171,7 @@ impl StreamConfig {
match &self.encoding {
StreamEncoding::Csv => {
let header = self.header && !self.location.exists();
- let file =
OpenOptions::new().write(true).open(&self.location)?;
+ let file =
OpenOptions::new().append(true).open(&self.location)?;
let writer = arrow::csv::WriterBuilder::new()
.with_header(header)
.build(file);
@@ -179,7 +179,7 @@ impl StreamConfig {
Ok(Box::new(writer))
}
StreamEncoding::Json => {
- let file =
OpenOptions::new().write(true).open(&self.location)?;
+ let file =
OpenOptions::new().append(true).open(&self.location)?;
Ok(Box::new(arrow::json::LineDelimitedWriter::new(file)))
}
}
@@ -298,7 +298,12 @@ struct StreamWrite(Arc<StreamConfig>);
impl DisplayAs for StreamWrite {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
- write!(f, "{self:?}")
+ f.debug_struct("StreamWrite")
+ .field("location", &self.0.location)
+ .field("batch_size", &self.0.batch_size)
+ .field("encoding", &self.0.encoding)
+ .field("header", &self.0.header)
+ .finish_non_exhaustive()
}
}
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 1f1ef73cae..82d96c98e6 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -27,7 +27,6 @@ use crate::datasource::file_format::csv::CsvFormat;
use crate::datasource::file_format::json::JsonFormat;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
-use crate::datasource::file_format::write::FileWriterMode;
use crate::datasource::file_format::FileFormat;
use crate::datasource::listing::ListingTableUrl;
use crate::datasource::physical_plan::FileSinkConfig;
@@ -591,7 +590,6 @@ impl DefaultPhysicalPlanner {
output_schema: Arc::new(schema),
table_partition_cols: vec![],
unbounded_input: false,
- writer_mode: FileWriterMode::PutMultipart,
single_file_output: *single_file_output,
overwrite: false,
file_type_writer_options
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index ad83ea1fce..750d12bd77 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -1157,12 +1157,6 @@ message PhysicalPlanNode {
}
}
-enum FileWriterMode {
- APPEND = 0;
- PUT = 1;
- PUT_MULTIPART = 2;
-}
-
enum CompressionTypeVariant {
GZIP = 0;
BZIP2 = 1;
@@ -1187,12 +1181,13 @@ message JsonWriterOptions {
}
message FileSinkConfig {
+ reserved 6; // writer_mode
+
string object_store_url = 1;
repeated PartitionedFile file_groups = 2;
repeated string table_paths = 3;
Schema output_schema = 4;
repeated PartitionColumn table_partition_cols = 5;
- FileWriterMode writer_mode = 6;
bool single_file_output = 7;
bool unbounded_input = 8;
bool overwrite = 9;
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 016719a600..af64bd68de 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -7471,9 +7471,6 @@ impl serde::Serialize for FileSinkConfig {
if !self.table_partition_cols.is_empty() {
len += 1;
}
- if self.writer_mode != 0 {
- len += 1;
- }
if self.single_file_output {
len += 1;
}
@@ -7502,11 +7499,6 @@ impl serde::Serialize for FileSinkConfig {
if !self.table_partition_cols.is_empty() {
struct_ser.serialize_field("tablePartitionCols",
&self.table_partition_cols)?;
}
- if self.writer_mode != 0 {
- let v = FileWriterMode::try_from(self.writer_mode)
- .map_err(|_| serde::ser::Error::custom(format!("Invalid
variant {}", self.writer_mode)))?;
- struct_ser.serialize_field("writerMode", &v)?;
- }
if self.single_file_output {
struct_ser.serialize_field("singleFileOutput",
&self.single_file_output)?;
}
@@ -7539,8 +7531,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
"outputSchema",
"table_partition_cols",
"tablePartitionCols",
- "writer_mode",
- "writerMode",
"single_file_output",
"singleFileOutput",
"unbounded_input",
@@ -7557,7 +7547,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
TablePaths,
OutputSchema,
TablePartitionCols,
- WriterMode,
SingleFileOutput,
UnboundedInput,
Overwrite,
@@ -7588,7 +7577,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
"tablePaths" | "table_paths" =>
Ok(GeneratedField::TablePaths),
"outputSchema" | "output_schema" =>
Ok(GeneratedField::OutputSchema),
"tablePartitionCols" | "table_partition_cols" =>
Ok(GeneratedField::TablePartitionCols),
- "writerMode" | "writer_mode" =>
Ok(GeneratedField::WriterMode),
"singleFileOutput" | "single_file_output" =>
Ok(GeneratedField::SingleFileOutput),
"unboundedInput" | "unbounded_input" =>
Ok(GeneratedField::UnboundedInput),
"overwrite" => Ok(GeneratedField::Overwrite),
@@ -7617,7 +7605,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
let mut table_paths__ = None;
let mut output_schema__ = None;
let mut table_partition_cols__ = None;
- let mut writer_mode__ = None;
let mut single_file_output__ = None;
let mut unbounded_input__ = None;
let mut overwrite__ = None;
@@ -7654,12 +7641,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
}
table_partition_cols__ = Some(map_.next_value()?);
}
- GeneratedField::WriterMode => {
- if writer_mode__.is_some() {
- return
Err(serde::de::Error::duplicate_field("writerMode"));
- }
- writer_mode__ =
Some(map_.next_value::<FileWriterMode>()? as i32);
- }
GeneratedField::SingleFileOutput => {
if single_file_output__.is_some() {
return
Err(serde::de::Error::duplicate_field("singleFileOutput"));
@@ -7692,7 +7673,6 @@ impl<'de> serde::Deserialize<'de> for FileSinkConfig {
table_paths: table_paths__.unwrap_or_default(),
output_schema: output_schema__,
table_partition_cols:
table_partition_cols__.unwrap_or_default(),
- writer_mode: writer_mode__.unwrap_or_default(),
single_file_output:
single_file_output__.unwrap_or_default(),
unbounded_input: unbounded_input__.unwrap_or_default(),
overwrite: overwrite__.unwrap_or_default(),
@@ -7800,80 +7780,6 @@ impl<'de> serde::Deserialize<'de> for
FileTypeWriterOptions {
deserializer.deserialize_struct("datafusion.FileTypeWriterOptions",
FIELDS, GeneratedVisitor)
}
}
-impl serde::Serialize for FileWriterMode {
- #[allow(deprecated)]
- fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
- where
- S: serde::Serializer,
- {
- let variant = match self {
- Self::Append => "APPEND",
- Self::Put => "PUT",
- Self::PutMultipart => "PUT_MULTIPART",
- };
- serializer.serialize_str(variant)
- }
-}
-impl<'de> serde::Deserialize<'de> for FileWriterMode {
- #[allow(deprecated)]
- fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
- where
- D: serde::Deserializer<'de>,
- {
- const FIELDS: &[&str] = &[
- "APPEND",
- "PUT",
- "PUT_MULTIPART",
- ];
-
- struct GeneratedVisitor;
-
- impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
- type Value = FileWriterMode;
-
- fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) ->
std::fmt::Result {
- write!(formatter, "expected one of: {:?}", &FIELDS)
- }
-
- fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value,
E>
- where
- E: serde::de::Error,
- {
- i32::try_from(v)
- .ok()
- .and_then(|x| x.try_into().ok())
- .ok_or_else(|| {
-
serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self)
- })
- }
-
- fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value,
E>
- where
- E: serde::de::Error,
- {
- i32::try_from(v)
- .ok()
- .and_then(|x| x.try_into().ok())
- .ok_or_else(|| {
-
serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self)
- })
- }
-
- fn visit_str<E>(self, value: &str) ->
std::result::Result<Self::Value, E>
- where
- E: serde::de::Error,
- {
- match value {
- "APPEND" => Ok(FileWriterMode::Append),
- "PUT" => Ok(FileWriterMode::Put),
- "PUT_MULTIPART" => Ok(FileWriterMode::PutMultipart),
- _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
- }
- }
- }
- deserializer.deserialize_any(GeneratedVisitor)
- }
-}
impl serde::Serialize for FilterExecNode {
#[allow(deprecated)]
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok,
S::Error>
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index 647f814fda..b23f09e91b 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -1615,8 +1615,6 @@ pub struct FileSinkConfig {
pub output_schema: ::core::option::Option<Schema>,
#[prost(message, repeated, tag = "5")]
pub table_partition_cols: ::prost::alloc::vec::Vec<PartitionColumn>,
- #[prost(enumeration = "FileWriterMode", tag = "6")]
- pub writer_mode: i32,
#[prost(bool, tag = "7")]
pub single_file_output: bool,
#[prost(bool, tag = "8")]
@@ -3200,35 +3198,6 @@ impl UnionMode {
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord,
::prost::Enumeration)]
#[repr(i32)]
-pub enum FileWriterMode {
- Append = 0,
- Put = 1,
- PutMultipart = 2,
-}
-impl FileWriterMode {
- /// String value of the enum field names used in the ProtoBuf definition.
- ///
- /// The values are not transformed in any way and thus are considered
stable
- /// (if the ProtoBuf definition does not change) and safe for programmatic
use.
- pub fn as_str_name(&self) -> &'static str {
- match self {
- FileWriterMode::Append => "APPEND",
- FileWriterMode::Put => "PUT",
- FileWriterMode::PutMultipart => "PUT_MULTIPART",
- }
- }
- /// Creates an enum from field names used in the ProtoBuf definition.
- pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
- match value {
- "APPEND" => Some(Self::Append),
- "PUT" => Some(Self::Put),
- "PUT_MULTIPART" => Some(Self::PutMultipart),
- _ => None,
- }
- }
-}
-#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord,
::prost::Enumeration)]
-#[repr(i32)]
pub enum CompressionTypeVariant {
Gzip = 0,
Bzip2 = 1,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index 22b74db9af..f5771ddb15 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -23,7 +23,6 @@ use std::sync::Arc;
use arrow::compute::SortOptions;
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::file_format::json::JsonSink;
-use datafusion::datasource::file_format::write::FileWriterMode;
use datafusion::datasource::listing::{FileRange, ListingTableUrl,
PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
@@ -739,7 +738,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
table_paths,
output_schema: Arc::new(convert_required!(conf.output_schema)?),
table_partition_cols,
- writer_mode: conf.writer_mode().into(),
single_file_output: conf.single_file_output,
unbounded_input: conf.unbounded_input,
overwrite: conf.overwrite,
@@ -748,16 +746,6 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig
{
}
}
-impl From<protobuf::FileWriterMode> for FileWriterMode {
- fn from(value: protobuf::FileWriterMode) -> Self {
- match value {
- protobuf::FileWriterMode::Append => Self::Append,
- protobuf::FileWriterMode::Put => Self::Put,
- protobuf::FileWriterMode::PutMultipart => Self::PutMultipart,
- }
- }
-}
-
impl From<protobuf::CompressionTypeVariant> for CompressionTypeVariant {
fn from(value: protobuf::CompressionTypeVariant) -> Self {
match value {
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index b8a590b0dc..44864be947 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -31,7 +31,6 @@ use datafusion::datasource::{
file_format::json::JsonSink, physical_plan::FileScanConfig,
};
use datafusion::datasource::{
- file_format::write::FileWriterMode,
listing::{FileRange, PartitionedFile},
physical_plan::FileSinkConfig,
};
@@ -819,7 +818,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
type Error = DataFusionError;
fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
- let writer_mode: protobuf::FileWriterMode = conf.writer_mode.into();
let file_groups = conf
.file_groups
.iter()
@@ -847,7 +845,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
table_paths,
output_schema: Some(conf.output_schema.as_ref().try_into()?),
table_partition_cols,
- writer_mode: writer_mode.into(),
single_file_output: conf.single_file_output,
unbounded_input: conf.unbounded_input,
overwrite: conf.overwrite,
@@ -856,16 +853,6 @@ impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig
{
}
}
-impl From<FileWriterMode> for protobuf::FileWriterMode {
- fn from(value: FileWriterMode) -> Self {
- match value {
- FileWriterMode::Append => Self::Append,
- FileWriterMode::Put => Self::Put,
- FileWriterMode::PutMultipart => Self::PutMultipart,
- }
- }
-}
-
impl From<&CompressionTypeVariant> for protobuf::CompressionTypeVariant {
fn from(value: &CompressionTypeVariant) -> Self {
match value {
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 076ca41581..23b0ea43c7 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -22,7 +22,6 @@ use datafusion::arrow::array::ArrayRef;
use datafusion::arrow::compute::kernels::sort::SortOptions;
use datafusion::arrow::datatypes::{DataType, Field, Fields, IntervalUnit,
Schema};
use datafusion::datasource::file_format::json::JsonSink;
-use datafusion::datasource::file_format::write::FileWriterMode;
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
@@ -732,7 +731,6 @@ fn roundtrip_json_sink() -> Result<()> {
table_paths: vec![ListingTableUrl::parse("file:///")?],
output_schema: schema.clone(),
table_partition_cols: vec![("plan_type".to_string(), DataType::Utf8)],
- writer_mode: FileWriterMode::Put,
single_file_output: true,
unbounded_input: false,
overwrite: true,
diff --git a/datafusion/sqllogictest/test_files/copy.slt
b/datafusion/sqllogictest/test_files/copy.slt
index 6e4a711a01..fbf1523477 100644
--- a/datafusion/sqllogictest/test_files/copy.slt
+++ b/datafusion/sqllogictest/test_files/copy.slt
@@ -32,7 +32,7 @@ logical_plan
CopyTo: format=parquet output_url=test_files/scratch/copy/table
single_file_output=false options: (compression 'zstd(10)')
--TableScan: source_table projection=[col1, col2]
physical_plan
-FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
+FileSinkExec: sink=ParquetSink(file_groups=[])
--MemoryExec: partitions=1, partition_sizes=[1]
# Error case
diff --git a/datafusion/sqllogictest/test_files/errors.slt
b/datafusion/sqllogictest/test_files/errors.slt
index 4aded8a576..e3b2610e51 100644
--- a/datafusion/sqllogictest/test_files/errors.slt
+++ b/datafusion/sqllogictest/test_files/errors.slt
@@ -133,4 +133,4 @@ order by c9
statement error Inconsistent data type across values list at row 1 column 0.
Was Int64 but found Utf8
-create table foo as values (1), ('foo');
\ No newline at end of file
+create table foo as values (1), ('foo');
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index 9726c35a31..129814767c 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -140,7 +140,7 @@ physical_plan CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/te
# create a sink table, path is same with aggregate_test_100 table
# we do not overwrite this file, we only assert plan.
statement ok
-CREATE EXTERNAL TABLE sink_table (
+CREATE UNBOUNDED EXTERNAL TABLE sink_table (
c1 VARCHAR NOT NULL,
c2 TINYINT NOT NULL,
c3 SMALLINT NOT NULL,
@@ -168,7 +168,7 @@ Dml: op=[Insert Into] table=[sink_table]
----Sort: aggregate_test_100.c1 ASC NULLS LAST
------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7,
c8, c9, c10, c11, c12, c13]
physical_plan
-FileSinkExec: sink=CsvSink(writer_mode=Append,
file_groups=[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv])
+FileSinkExec: sink=StreamWrite { location:
"../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding:
Csv, header: true, .. }
--SortExec: expr=[c1@0 ASC NULLS LAST]
----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1,
c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true
diff --git a/datafusion/sqllogictest/test_files/insert.slt
b/datafusion/sqllogictest/test_files/insert.slt
index 9860bdcae0..a100b5ac6b 100644
--- a/datafusion/sqllogictest/test_files/insert.slt
+++ b/datafusion/sqllogictest/test_files/insert.slt
@@ -289,7 +289,7 @@ insert into table_without_values values(2, NULL);
----
1
-# insert NULL values for the missing column (field2)
+# insert NULL values for the missing column (field2)
query II
insert into table_without_values(field1) values(3);
----
diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt
b/datafusion/sqllogictest/test_files/insert_to_external.slt
index 4441036241..39323479ff 100644
--- a/datafusion/sqllogictest/test_files/insert_to_external.slt
+++ b/datafusion/sqllogictest/test_files/insert_to_external.slt
@@ -100,7 +100,7 @@ Dml: op=[Insert Into] table=[ordered_insert_test]
--Projection: column1 AS a, column2 AS b
----Values: (Int64(5), Int64(1)), (Int64(4), Int64(2)), (Int64(7), Int64(7)),
(Int64(7), Int64(8)), (Int64(7), Int64(9))...
physical_plan
-FileSinkExec: sink=CsvSink(writer_mode=PutMultipart, file_groups=[])
+FileSinkExec: sink=CsvSink(file_groups=[])
--SortExec: expr=[a@0 ASC NULLS LAST,b@1 DESC]
----ProjectionExec: expr=[column1@0 as a, column2@1 as b]
------ValuesExec
@@ -254,6 +254,22 @@ create_local_path 'true',
single_file 'true',
);
+query error DataFusion error: Error during planning: Inserting into a
ListingTable backed by a single file is not supported, URL is possibly missing
a trailing `/`\. To append to an existing file use StreamTable, e\.g\. by using
CREATE UNBOUNDED EXTERNAL TABLE
+INSERT INTO single_file_test values (1, 2), (3, 4);
+
+statement ok
+drop table single_file_test;
+
+statement ok
+CREATE UNBOUNDED EXTERNAL TABLE
+single_file_test(a bigint, b bigint)
+STORED AS csv
+LOCATION 'test_files/scratch/insert_to_external/single_csv_table.csv'
+OPTIONS(
+create_local_path 'true',
+single_file 'true',
+);
+
query II
INSERT INTO single_file_test values (1, 2), (3, 4);
----
@@ -315,7 +331,7 @@ Dml: op=[Insert Into] table=[table_without_values]
--------WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
-FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
+FileSinkExec: sink=ParquetSink(file_groups=[])
--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4) PARTITION
BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(*) PAR [...]
@@ -378,7 +394,7 @@ Dml: op=[Insert Into] table=[table_without_values]
----WindowAggr: windowExpr=[[SUM(CAST(aggregate_test_100.c4 AS Int64))
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS
LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING AS COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
------TableScan: aggregate_test_100 projection=[c1, c4, c9]
physical_plan
-FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
+FileSinkExec: sink=ParquetSink(file_groups=[])
--CoalescePartitionsExec
----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(*) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4) PARTITION BY
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 1 FOLLOWING: Ok(Field { name:
"SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1
FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound [...]
@@ -422,7 +438,7 @@ Dml: op=[Insert Into] table=[table_without_values]
----Sort: aggregate_test_100.c1 ASC NULLS LAST
------TableScan: aggregate_test_100 projection=[c1]
physical_plan
-FileSinkExec: sink=ParquetSink(writer_mode=PutMultipart, file_groups=[])
+FileSinkExec: sink=ParquetSink(file_groups=[])
--SortExec: expr=[c1@0 ASC NULLS LAST]
----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1],
has_header=true
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 737b43b5a9..0fea8da5a3 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -3486,4 +3486,3 @@ set datafusion.optimizer.prefer_existing_sort = false;
statement ok
drop table annotated_data;
-
diff --git a/datafusion/sqllogictest/test_files/options.slt
b/datafusion/sqllogictest/test_files/options.slt
index 83fe85745e..9366a9b3b3 100644
--- a/datafusion/sqllogictest/test_files/options.slt
+++ b/datafusion/sqllogictest/test_files/options.slt
@@ -84,7 +84,7 @@ statement ok
drop table a
# test datafusion.sql_parser.parse_float_as_decimal
-#
+#
# default option value is false
query RR
select 10000000000000000000.01, -10000000000000000000.01
@@ -209,5 +209,3 @@ select -123456789.0123456789012345678901234567890
# Restore option to default value
statement ok
set datafusion.sql_parser.parse_float_as_decimal = false;
-
-
diff --git a/datafusion/sqllogictest/test_files/order.slt
b/datafusion/sqllogictest/test_files/order.slt
index 8148f1c4c7..9c5d1704f4 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -447,7 +447,7 @@ statement ok
drop table multiple_ordered_table;
# Create tables having some ordered columns. In the next step, we will expect
to observe that scalar
-# functions, such as mathematical functions like atan(), ceil(), sqrt(), or
date_time functions
+# functions, such as mathematical functions like atan(), ceil(), sqrt(), or
date_time functions
# like date_bin() and date_trunc(), will maintain the order of its argument
columns.
statement ok
CREATE EXTERNAL TABLE csv_with_timestamps (
diff --git a/datafusion/sqllogictest/test_files/predicates.slt
b/datafusion/sqllogictest/test_files/predicates.slt
index d22b2ff953..e992a440d0 100644
--- a/datafusion/sqllogictest/test_files/predicates.slt
+++ b/datafusion/sqllogictest/test_files/predicates.slt
@@ -495,6 +495,7 @@ set datafusion.execution.parquet.bloom_filter_enabled=true;
query T
SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'foo';
+----
query T
SELECT * FROM data_index_bloom_encoding_stats WHERE "String" = 'test';
diff --git a/datafusion/sqllogictest/test_files/set_variable.slt
b/datafusion/sqllogictest/test_files/set_variable.slt
index 714e1e995e..440fb2c6ef 100644
--- a/datafusion/sqllogictest/test_files/set_variable.slt
+++ b/datafusion/sqllogictest/test_files/set_variable.slt
@@ -243,4 +243,4 @@ statement ok
SET TIME ZONE = 'Asia/Taipei2'
statement error Arrow error: Parser error: Invalid timezone "Asia/Taipei2":
'Asia/Taipei2' is not a valid timezone
-SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ
\ No newline at end of file
+SELECT '2000-01-01T00:00:00'::TIMESTAMP::TIMESTAMPTZ
diff --git a/datafusion/sqllogictest/test_files/update.slt
b/datafusion/sqllogictest/test_files/update.slt
index c88082fc72..6412c3ca85 100644
--- a/datafusion/sqllogictest/test_files/update.slt
+++ b/datafusion/sqllogictest/test_files/update.slt
@@ -89,4 +89,4 @@ Dml: op=[Update] table=[t1]
------CrossJoin:
--------SubqueryAlias: t
----------TableScan: t1
---------TableScan: t2
\ No newline at end of file
+--------TableScan: t2