alamb commented on code in PR #6526:
URL: https://github.com/apache/arrow-datafusion/pull/6526#discussion_r1216501062


##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -759,6 +759,62 @@ impl TableProvider for ListingTable {
     fn get_table_definition(&self) -> Option<&str> {
         self.definition.as_deref()
     }
+
+    async fn insert_into(
+        &self,
+        state: &SessionState,
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Check that the schema of the plan matches the schema of this table.
+        if !input.schema().eq(&self.schema()) {
+            return Err(DataFusionError::Plan(
+                // Return an error if schema of the input query does not match 
with the table schema.
+                "Inserting query must have the same schema with the 
table.".to_string(),
+            ));
+        }
+
+        if self.table_paths().len() > 1 {
+            return Err(DataFusionError::Plan(
+                "Datafusion currently supports tables from single 
file.".to_owned(),

Review Comment:
   ```suggestion
                   "Writing to a table backed by multiple files not supported 
file.".to_owned(),
   ```



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -759,6 +759,62 @@ impl TableProvider for ListingTable {
     fn get_table_definition(&self) -> Option<&str> {
         self.definition.as_deref()
     }
+
+    async fn insert_into(
+        &self,
+        state: &SessionState,
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // Check that the schema of the plan matches the schema of this table.
+        if !input.schema().eq(&self.schema()) {
+            return Err(DataFusionError::Plan(
+                // Return an error if schema of the input query does not match 
with the table schema.
+                "Inserting query must have the same schema with the 
table.".to_string(),
+            ));
+        }
+
+        if self.table_paths().len() > 1 {
+            return Err(DataFusionError::Plan(
+                "Datafusion currently supports tables from single 
file.".to_owned(),
+            ));
+        }
+
+        let table_path = &self.table_paths()[0];
+        // Get the object store for the table path.
+        let store = state.runtime_env().object_store(table_path)?;
+
+        let file_list_future = pruned_partition_list(

Review Comment:
   I this is a Stream (though I supposed that is probably technically future as 
well) -- it might be clearer  to call this `file_list_stream`



##########
datafusion/core/src/physical_plan/file_format/csv.rs:
##########
@@ -870,4 +948,66 @@ mod tests {
             }
         }
     }
+
+    async fn register_aggregate_csv_by_sql(ctx: &SessionContext, table_name: 
&str) {
+        let testdata = arrow_test_data();
+
+        ctx.sql(&format!(
+            "CREATE EXTERNAL TABLE {table_name} (
+                c1  VARCHAR NOT NULL,
+                c2  TINYINT NOT NULL,
+                c3  SMALLINT NOT NULL,
+                c4  SMALLINT NOT NULL,
+                c5  INTEGER NOT NULL,
+                c6  BIGINT NOT NULL,
+                c7  SMALLINT NOT NULL,
+                c8  INT NOT NULL,
+                c9  INT UNSIGNED NOT NULL,
+                c10 BIGINT UNSIGNED NOT NULL,
+                c11 FLOAT NOT NULL,
+                c12 DOUBLE NOT NULL,
+                c13 VARCHAR NOT NULL
+            )
+            STORED AS CSV
+            WITH HEADER ROW
+            LOCATION '{testdata}/csv/aggregate_test_100.csv'
+        "
+        ))
+        .await
+        .expect("Creating dataframe for CREATE EXTERNAL TABLE");
+    }
+
+    #[tokio::test]
+    async fn test_listing_table_insert_into() -> Result<()> {

Review Comment:
   I recommend this be put into an `sqllogictest` rather than here (as it 
appears to be a simple explain plan test)



##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -324,6 +349,188 @@ fn build_schema_helper(names: Vec<String>, types: 
&[HashSet<DataType>]) -> Schem
     Schema::new(fields)
 }
 
+impl Default for CsvSerializer {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Define a struct for serializing CSV records to a stream
+pub struct CsvSerializer {
+    // CSV writer builder
+    builder: WriterBuilder,
+    // Inner buffer for avoiding reallocation
+    buffer: Vec<u8>,
+    // Flag to indicate whether there will be a header
+    header: bool,
+}
+
+impl CsvSerializer {
+    /// Constructor for the CsvSerializer object
+    pub fn new() -> Self {
+        Self {
+            builder: WriterBuilder::new(),
+            header: true,
+            buffer: Vec::with_capacity(4096),
+        }
+    }
+
+    /// Method for setting the CSV writer builder
+    pub fn with_builder(mut self, builder: WriterBuilder) -> Self {
+        self.builder = builder;
+        self
+    }
+
+    /// Method for setting the CSV writer header status
+    pub fn with_header(mut self, header: bool) -> Self {
+        self.header = header;
+        self
+    }
+}
+
+#[async_trait]
+impl BatchSerializer for CsvSerializer {
+    async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
+        let builder = self.builder.clone();
+        let mut writer = builder.has_headers(self.header).build(&mut 
self.buffer);
+        writer.write(&batch)?;
+        drop(writer);
+        self.header = false;
+        Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))

Review Comment:
   👍 



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -87,6 +98,277 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         conf: FileScanConfig,
         filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>>;
+
+    /// Take a list of files and the configuration to convert it to the
+    /// appropriate writer executor according to this file format.
+    async fn create_writer_physical_plan(
+        &self,
+        _input: Arc<dyn ExecutionPlan>,
+        _state: &SessionState,
+        _conf: FileSinkConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let msg = "Writer not implemented for this format".to_owned();
+        Err(DataFusionError::NotImplemented(msg))
+    }
+}
+
+/// `AsyncPutWriter` is an object that facilitates asynchronous writing to 
object stores.
+/// It is specifically designed for the `object_store` crate's `put` method 
and sends
+/// whole bytes at once when the buffer is flushed.
+pub struct AsyncPutWriter {
+    /// Object metadata
+    object_meta: ObjectMeta,
+    /// A shared reference to the object store
+    store: Arc<dyn ObjectStore>,
+    /// A buffer that stores the bytes to be sent
+    current_buffer: Vec<u8>,
+    /// Used for async handling in flush method
+    inner_state: AsyncPutState,
+}
+
+impl AsyncPutWriter {
+    /// Constructor for the `AsyncPutWriter` object
+    pub fn new(object_meta: ObjectMeta, store: Arc<dyn ObjectStore>) -> Self {
+        Self {
+            object_meta,
+            store,
+            current_buffer: vec![],
+            // The writer starts out in buffering mode
+            inner_state: AsyncPutState::Buffer,
+        }
+    }
+
+    /// Separate implementation function that unpins the [`AsyncPutWriter`] so
+    /// that partial borrows work correctly
+    fn poll_shutdown_inner(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        loop {
+            match &mut self.inner_state {
+                AsyncPutState::Buffer => {
+                    // Convert the current buffer to bytes and take ownership 
of it
+                    let bytes = Bytes::from(mem::take(&mut 
self.current_buffer));
+                    // Set the inner state to Put variant with the bytes
+                    self.inner_state = AsyncPutState::Put { bytes }
+                }
+                AsyncPutState::Put { bytes } => {
+                    // Send the bytes to the object store's put method
+                    return Poll::Ready(
+                        ready!(self
+                            .store
+                            .put(&self.object_meta.location, bytes.clone())
+                            .poll_unpin(cx))
+                        .map_err(Error::from),
+                    );
+                }
+            }
+        }
+    }
+}
+
+/// An enum that represents the inner state of AsyncPut
+enum AsyncPutState {
+    /// Building Bytes struct in this state
+    Buffer,
+    /// Data in the buffer is being sent to the object store
+    Put { bytes: Bytes },
+}
+
+impl AsyncWrite for AsyncPutWriter {
+    // Define the implementation of the AsyncWrite trait for the 
`AsyncPutWriter` struct
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<std::result::Result<usize, Error>> {
+        // Extend the current buffer with the incoming buffer
+        self.current_buffer.extend_from_slice(buf);
+        // Return a ready poll with the length of the incoming buffer
+        Poll::Ready(Ok(buf.len()))
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        // Return a ready poll with an empty result

Review Comment:
   According to 
https://docs.rs/tokio/1.28.2/tokio/io/trait.AsyncWrite.html#tymethod.poll_flush
   
   It seems to me like a flush shouldn't return `Poll::Ready(Ok(())` until the 
data is actually flushed (`inner_state` is empty and `inner_state: 
AsyncPutState::Buffer,`) though I am not an expert here 🤔 



##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -238,6 +239,30 @@ impl FileScanConfig {
     }
 }
 
+/// The base configurations to provide when creating a physical plan for
+/// any given file format.

Review Comment:
   ```suggestion
   /// The base configurations to provide when creating a physical plan for
   /// writing to any given file format.
   ```



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -87,6 +98,277 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         conf: FileScanConfig,
         filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>>;
+
+    /// Take a list of files and the configuration to convert it to the
+    /// appropriate writer executor according to this file format.
+    async fn create_writer_physical_plan(
+        &self,
+        _input: Arc<dyn ExecutionPlan>,
+        _state: &SessionState,
+        _conf: FileSinkConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let msg = "Writer not implemented for this format".to_owned();
+        Err(DataFusionError::NotImplemented(msg))
+    }
+}
+
+/// `AsyncPutWriter` is an object that facilitates asynchronous writing to 
object stores.
+/// It is specifically designed for the `object_store` crate's `put` method 
and sends
+/// whole bytes at once when the buffer is flushed.
+pub struct AsyncPutWriter {
+    /// Object metadata
+    object_meta: ObjectMeta,
+    /// A shared reference to the object store
+    store: Arc<dyn ObjectStore>,
+    /// A buffer that stores the bytes to be sent
+    current_buffer: Vec<u8>,
+    /// Used for async handling in flush method
+    inner_state: AsyncPutState,
+}
+
+impl AsyncPutWriter {
+    /// Constructor for the `AsyncPutWriter` object
+    pub fn new(object_meta: ObjectMeta, store: Arc<dyn ObjectStore>) -> Self {
+        Self {
+            object_meta,
+            store,
+            current_buffer: vec![],
+            // The writer starts out in buffering mode
+            inner_state: AsyncPutState::Buffer,
+        }
+    }
+
+    /// Separate implementation function that unpins the [`AsyncPutWriter`] so
+    /// that partial borrows work correctly
+    fn poll_shutdown_inner(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        loop {
+            match &mut self.inner_state {
+                AsyncPutState::Buffer => {
+                    // Convert the current buffer to bytes and take ownership 
of it
+                    let bytes = Bytes::from(mem::take(&mut 
self.current_buffer));
+                    // Set the inner state to Put variant with the bytes
+                    self.inner_state = AsyncPutState::Put { bytes }
+                }
+                AsyncPutState::Put { bytes } => {
+                    // Send the bytes to the object store's put method
+                    return Poll::Ready(
+                        ready!(self
+                            .store
+                            .put(&self.object_meta.location, bytes.clone())
+                            .poll_unpin(cx))
+                        .map_err(Error::from),
+                    );
+                }
+            }
+        }
+    }
+}
+
+/// An enum that represents the inner state of AsyncPut
+enum AsyncPutState {
+    /// Building Bytes struct in this state
+    Buffer,
+    /// Data in the buffer is being sent to the object store
+    Put { bytes: Bytes },
+}
+
+impl AsyncWrite for AsyncPutWriter {
+    // Define the implementation of the AsyncWrite trait for the 
`AsyncPutWriter` struct
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<std::result::Result<usize, Error>> {
+        // Extend the current buffer with the incoming buffer
+        self.current_buffer.extend_from_slice(buf);
+        // Return a ready poll with the length of the incoming buffer
+        Poll::Ready(Ok(buf.len()))
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        // Return a ready poll with an empty result
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_shutdown(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        // Call the poll_shutdown_inner method to handle the actual sending of 
data to the object store
+        self.poll_shutdown_inner(cx)
+    }
+}
+
+/// A simple wrapper around an `AsyncWrite` type.

Review Comment:
   It took me a while to figure out what this was for but now I see it is for 
adapting any `AsyncWrite` to add `FileWriterExt` with a default implementation 
of `abort. I suggest we at least add this as a comment
   
   It suggest naming this struct `FileWriterExtAdapter` or something so its 
purpose could be more easily inferred from its name. 
   
   ```suggestion
   /// A simple wrapper around an `AsyncWrite` type that implements
   /// `FileWriterExt`
   ```



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -87,6 +98,277 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         conf: FileScanConfig,
         filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>>;
+
+    /// Take a list of files and the configuration to convert it to the
+    /// appropriate writer executor according to this file format.
+    async fn create_writer_physical_plan(
+        &self,
+        _input: Arc<dyn ExecutionPlan>,
+        _state: &SessionState,
+        _conf: FileSinkConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let msg = "Writer not implemented for this format".to_owned();
+        Err(DataFusionError::NotImplemented(msg))
+    }
+}
+
+/// `AsyncPutWriter` is an object that facilitates asynchronous writing to 
object stores.
+/// It is specifically designed for the `object_store` crate's `put` method 
and sends
+/// whole bytes at once when the buffer is flushed.
+pub struct AsyncPutWriter {
+    /// Object metadata
+    object_meta: ObjectMeta,
+    /// A shared reference to the object store
+    store: Arc<dyn ObjectStore>,
+    /// A buffer that stores the bytes to be sent
+    current_buffer: Vec<u8>,
+    /// Used for async handling in flush method
+    inner_state: AsyncPutState,
+}
+
+impl AsyncPutWriter {
+    /// Constructor for the `AsyncPutWriter` object
+    pub fn new(object_meta: ObjectMeta, store: Arc<dyn ObjectStore>) -> Self {
+        Self {
+            object_meta,
+            store,
+            current_buffer: vec![],
+            // The writer starts out in buffering mode
+            inner_state: AsyncPutState::Buffer,
+        }
+    }
+
+    /// Separate implementation function that unpins the [`AsyncPutWriter`] so
+    /// that partial borrows work correctly
+    fn poll_shutdown_inner(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        loop {

Review Comment:
   My reading of 
https://docs.rs/tokio/1.28.2/tokio/io/trait.AsyncWrite.html#tymethod.poll_shutdown
 implies that by writing on `shutdown` it means as written this writer will 
buffer everything before starting a write. 
   
   This is probably fine for the initial version, but it is probably something 
to improving in the future. 



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -87,6 +98,277 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         conf: FileScanConfig,
         filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>>;
+
+    /// Take a list of files and the configuration to convert it to the
+    /// appropriate writer executor according to this file format.
+    async fn create_writer_physical_plan(
+        &self,
+        _input: Arc<dyn ExecutionPlan>,
+        _state: &SessionState,
+        _conf: FileSinkConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let msg = "Writer not implemented for this format".to_owned();
+        Err(DataFusionError::NotImplemented(msg))
+    }
+}
+
+/// `AsyncPutWriter` is an object that facilitates asynchronous writing to 
object stores.
+/// It is specifically designed for the `object_store` crate's `put` method 
and sends
+/// whole bytes at once when the buffer is flushed.
+pub struct AsyncPutWriter {
+    /// Object metadata
+    object_meta: ObjectMeta,
+    /// A shared reference to the object store
+    store: Arc<dyn ObjectStore>,
+    /// A buffer that stores the bytes to be sent
+    current_buffer: Vec<u8>,
+    /// Used for async handling in flush method
+    inner_state: AsyncPutState,
+}
+
+impl AsyncPutWriter {
+    /// Constructor for the `AsyncPutWriter` object
+    pub fn new(object_meta: ObjectMeta, store: Arc<dyn ObjectStore>) -> Self {
+        Self {
+            object_meta,
+            store,
+            current_buffer: vec![],
+            // The writer starts out in buffering mode
+            inner_state: AsyncPutState::Buffer,
+        }
+    }
+
+    /// Separate implementation function that unpins the [`AsyncPutWriter`] so
+    /// that partial borrows work correctly
+    fn poll_shutdown_inner(
+        &mut self,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        loop {
+            match &mut self.inner_state {
+                AsyncPutState::Buffer => {
+                    // Convert the current buffer to bytes and take ownership 
of it
+                    let bytes = Bytes::from(mem::take(&mut 
self.current_buffer));
+                    // Set the inner state to Put variant with the bytes
+                    self.inner_state = AsyncPutState::Put { bytes }
+                }
+                AsyncPutState::Put { bytes } => {
+                    // Send the bytes to the object store's put method
+                    return Poll::Ready(
+                        ready!(self
+                            .store
+                            .put(&self.object_meta.location, bytes.clone())
+                            .poll_unpin(cx))
+                        .map_err(Error::from),
+                    );
+                }
+            }
+        }
+    }
+}
+
+/// An enum that represents the inner state of AsyncPut
+enum AsyncPutState {
+    /// Building Bytes struct in this state
+    Buffer,
+    /// Data in the buffer is being sent to the object store
+    Put { bytes: Bytes },
+}
+
+impl AsyncWrite for AsyncPutWriter {
+    // Define the implementation of the AsyncWrite trait for the 
`AsyncPutWriter` struct
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<std::result::Result<usize, Error>> {
+        // Extend the current buffer with the incoming buffer
+        self.current_buffer.extend_from_slice(buf);
+        // Return a ready poll with the length of the incoming buffer
+        Poll::Ready(Ok(buf.len()))
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        _: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        // Return a ready poll with an empty result
+        Poll::Ready(Ok(()))
+    }
+
+    fn poll_shutdown(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        // Call the poll_shutdown_inner method to handle the actual sending of 
data to the object store
+        self.poll_shutdown_inner(cx)
+    }
+}
+
+/// A simple wrapper around an `AsyncWrite` type.
+pub struct AsyncPut<W: AsyncWrite + Unpin + Send> {
+    writer: W,
+}
+
+impl<W: AsyncWrite + Unpin + Send> AsyncPut<W> {
+    /// Create a new `AsyncPut` instance with the given writer.
+    pub fn new(writer: W) -> Self {
+        Self { writer }
+    }
+}
+
+impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AsyncPut<W> {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<std::result::Result<usize, Error>> {
+        Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        Pin::new(&mut self.get_mut().writer).poll_flush(cx)
+    }
+
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
+    }
+}
+
+impl<W: AsyncWrite + Unpin + Send> FileWriterExt for AsyncPut<W> {}
+
+/// An extension trait for `AsyncWrite` types that adds an `abort_writer` 
method.
+pub trait FileWriterExt: AsyncWrite + Unpin + Send {
+    /// Aborts the writer and returns a boxed future with the result.
+    /// The default implementation returns an immediately resolved future with 
`Ok(())`.
+    fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
+        Ok(async { Ok(()) }.boxed())
+    }
+}
+
+/// A wrapper around an `AsyncWrite` type that provides multipart upload 
functionality.
+pub struct AsyncPutMultipart<W: AsyncWrite + Unpin + Send> {
+    writer: W,
+    /// A shared reference to the object store
+    store: Arc<dyn ObjectStore>,
+    multipart_id: MultipartId,
+    location: Path,
+}
+
+impl<W: AsyncWrite + Unpin + Send> AsyncPutMultipart<W> {
+    /// Create a new `AsyncPutMultipart` instance with the given writer, 
object store, multipart ID, and location.
+    pub fn new(
+        writer: W,
+        store: Arc<dyn ObjectStore>,
+        multipart_id: MultipartId,
+        location: Path,
+    ) -> Self {
+        Self {
+            writer,
+            store,
+            multipart_id,
+            location,
+        }
+    }
+}
+
+impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AsyncPutMultipart<W> {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<std::result::Result<usize, Error>> {
+        Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
+    }
+
+    fn poll_flush(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        Pin::new(&mut self.get_mut().writer).poll_flush(cx)
+    }
+
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
+    }
+}
+
+impl<W: AsyncWrite + Unpin + Send> FileWriterExt for AsyncPutMultipart<W> {
+    fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
+        let location = self.location.clone();
+        let multipart_id = self.multipart_id.clone();
+        let store = self.store.clone();
+        Ok(Box::pin(async move {
+            store
+                .abort_multipart(&location, &multipart_id)
+                .await
+                .map_err(DataFusionError::ObjectStore)
+        }))
+    }
+}
+
+/// A wrapper around an `AsyncWrite` type that provides append functionality.

Review Comment:
   This appears to be the same thing as `AsyncPut` and for the same purpose 
(adapting a `AsyncWrite` to `FileWriterExt`)
   
   I was able to remove `AsyncAppend` and use `AsyncPut` locally and everything 
still compiled and passed.  I recommend they are consolidated
   
   



##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -324,6 +349,188 @@ fn build_schema_helper(names: Vec<String>, types: 
&[HashSet<DataType>]) -> Schem
     Schema::new(fields)
 }
 
+impl Default for CsvSerializer {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Define a struct for serializing CSV records to a stream
+pub struct CsvSerializer {
+    // CSV writer builder
+    builder: WriterBuilder,
+    // Inner buffer for avoiding reallocation
+    buffer: Vec<u8>,
+    // Flag to indicate whether there will be a header
+    header: bool,
+}
+
+impl CsvSerializer {
+    /// Constructor for the CsvSerializer object
+    pub fn new() -> Self {
+        Self {
+            builder: WriterBuilder::new(),
+            header: true,
+            buffer: Vec::with_capacity(4096),
+        }
+    }
+
+    /// Method for setting the CSV writer builder
+    pub fn with_builder(mut self, builder: WriterBuilder) -> Self {
+        self.builder = builder;
+        self
+    }
+
+    /// Method for setting the CSV writer header status
+    pub fn with_header(mut self, header: bool) -> Self {
+        self.header = header;
+        self
+    }
+}
+
+#[async_trait]
+impl BatchSerializer for CsvSerializer {
+    async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
+        let builder = self.builder.clone();
+        let mut writer = builder.has_headers(self.header).build(&mut 
self.buffer);
+        writer.write(&batch)?;
+        drop(writer);
+        self.header = false;
+        Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
+    }
+}
+
+/// This macro tries to abort all writers before returning a possible error.
+macro_rules! handle_err_or_continue {
+    ($result:expr, $writers:expr) => {
+        match $result {
+            Ok(value) => Ok(value),
+            Err(e) => {
+                // Abort all writers before returning the error:
+                for writer in $writers {
+                    let abort_future = writer.abort_writer()?;

Review Comment:
   I think this could be a function (which would make it easier to work with). 
Maybe like
   
   ```rust
   fn check_for_errors(result: Result<RecordBatch>) -> Result<RecordBatch> {
   ...
   }
   ```
   
   Also, reading it I am not sure about the error handling. Specifically, 
   1.  if the abort fails, the abort error may be reported rather then the 
initial write error.
   2. If canceling one writer fails, the others may not be aborted (as `?` 
returns early) 
   
   I have some ideas on how to refactor this (into a struct) and to make it 
more testable,  which I will try and put up as a draft PR shortly



##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -87,6 +98,277 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
         conf: FileScanConfig,
         filters: Option<&Arc<dyn PhysicalExpr>>,
     ) -> Result<Arc<dyn ExecutionPlan>>;
+
+    /// Take a list of files and the configuration to convert it to the
+    /// appropriate writer executor according to this file format.
+    async fn create_writer_physical_plan(
+        &self,
+        _input: Arc<dyn ExecutionPlan>,
+        _state: &SessionState,
+        _conf: FileSinkConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let msg = "Writer not implemented for this format".to_owned();
+        Err(DataFusionError::NotImplemented(msg))
+    }
+}
+
+/// `AsyncPutWriter` is an object that facilitates asynchronous writing to 
object stores.
+/// It is specifically designed for the `object_store` crate's `put` method 
and sends
+/// whole bytes at once when the buffer is flushed.
+pub struct AsyncPutWriter {

Review Comment:
   @tustvold  and/or @crepererum  (my go to people for rust async / stream 
expertise) I wonder if you have some time to review this code that adapts 
DataFusion to use the object_store put multi part features to do streaming 
writes
   
   Pretty exciting!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to