mustafasrepo commented on code in PR #6526:
URL: https://github.com/apache/arrow-datafusion/pull/6526#discussion_r1218090461
##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -87,6 +98,277 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>>;
+
+ /// Take a list of files and the configuration to convert it to the
+ /// appropriate writer executor according to this file format.
+ async fn create_writer_physical_plan(
+ &self,
+ _input: Arc<dyn ExecutionPlan>,
+ _state: &SessionState,
+ _conf: FileSinkConfig,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let msg = "Writer not implemented for this format".to_owned();
+ Err(DataFusionError::NotImplemented(msg))
+ }
+}
+
+/// `AsyncPutWriter` is an object that facilitates asynchronous writing to
object stores.
+/// It is specifically designed for the `object_store` crate's `put` method
and sends
+/// whole bytes at once when the buffer is flushed.
+pub struct AsyncPutWriter {
+ /// Object metadata
+ object_meta: ObjectMeta,
+ /// A shared reference to the object store
+ store: Arc<dyn ObjectStore>,
+ /// A buffer that stores the bytes to be sent
+ current_buffer: Vec<u8>,
+ /// Used for async handling in flush method
+ inner_state: AsyncPutState,
+}
+
+impl AsyncPutWriter {
+ /// Constructor for the `AsyncPutWriter` object
+ pub fn new(object_meta: ObjectMeta, store: Arc<dyn ObjectStore>) -> Self {
+ Self {
+ object_meta,
+ store,
+ current_buffer: vec![],
+ // The writer starts out in buffering mode
+ inner_state: AsyncPutState::Buffer,
+ }
+ }
+
+ /// Separate implementation function that unpins the [`AsyncPutWriter`] so
+ /// that partial borrows work correctly
+ fn poll_shutdown_inner(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ loop {
+ match &mut self.inner_state {
+ AsyncPutState::Buffer => {
+ // Convert the current buffer to bytes and take ownership
of it
+ let bytes = Bytes::from(mem::take(&mut
self.current_buffer));
+ // Set the inner state to Put variant with the bytes
+ self.inner_state = AsyncPutState::Put { bytes }
+ }
+ AsyncPutState::Put { bytes } => {
+ // Send the bytes to the object store's put method
+ return Poll::Ready(
+ ready!(self
+ .store
+ .put(&self.object_meta.location, bytes.clone())
+ .poll_unpin(cx))
+ .map_err(Error::from),
+ );
+ }
+ }
+ }
+ }
+}
+
+/// An enum that represents the inner state of AsyncPut
+enum AsyncPutState {
+ /// Building Bytes struct in this state
+ Buffer,
+ /// Data in the buffer is being sent to the object store
+ Put { bytes: Bytes },
+}
+
+impl AsyncWrite for AsyncPutWriter {
+ // Define the implementation of the AsyncWrite trait for the
`AsyncPutWriter` struct
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<std::result::Result<usize, Error>> {
+ // Extend the current buffer with the incoming buffer
+ self.current_buffer.extend_from_slice(buf);
+ // Return a ready poll with the length of the incoming buffer
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ // Return a ready poll with an empty result
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ // Call the poll_shutdown_inner method to handle the actual sending of
data to the object store
+ self.poll_shutdown_inner(cx)
+ }
+}
+
+/// A simple wrapper around an `AsyncWrite` type that implements
`FileWriterExt`.
+pub struct AsyncPut<W: AsyncWrite + Unpin + Send> {
+ writer: W,
+}
+
+impl<W: AsyncWrite + Unpin + Send> AsyncPut<W> {
+ /// Create a new `AsyncPut` instance with the given writer.
+ pub fn new(writer: W) -> Self {
+ Self { writer }
+ }
+}
+
+impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AsyncPut<W> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<std::result::Result<usize, Error>> {
+ Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ Pin::new(&mut self.get_mut().writer).poll_flush(cx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
+ }
+}
+
+impl<W: AsyncWrite + Unpin + Send> FileWriterExt for AsyncPut<W> {}
+
+/// An extension trait for `AsyncWrite` types that adds an `abort_writer`
method.
+pub trait FileWriterExt: AsyncWrite + Unpin + Send {
+ /// Aborts the writer and returns a boxed future with the result.
+ /// The default implementation returns an immediately resolved future with
`Ok(())`.
+ fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
+ Ok(async { Ok(()) }.boxed())
+ }
+}
+
+/// A wrapper around an `AsyncWrite` type that provides multipart upload
functionality.
+pub struct AsyncPutMultipart<W: AsyncWrite + Unpin + Send> {
+ writer: W,
+ /// A shared reference to the object store
+ store: Arc<dyn ObjectStore>,
+ multipart_id: MultipartId,
+ location: Path,
+}
+
+impl<W: AsyncWrite + Unpin + Send> AsyncPutMultipart<W> {
+ /// Create a new `AsyncPutMultipart` instance with the given writer,
object store, multipart ID, and location.
+ pub fn new(
+ writer: W,
+ store: Arc<dyn ObjectStore>,
+ multipart_id: MultipartId,
+ location: Path,
+ ) -> Self {
+ Self {
+ writer,
+ store,
+ multipart_id,
+ location,
+ }
+ }
+}
+
+impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AsyncPutMultipart<W> {
+ fn poll_write(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<std::result::Result<usize, Error>> {
+ Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ Pin::new(&mut self.get_mut().writer).poll_flush(cx)
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
+ }
+}
+
+impl<W: AsyncWrite + Unpin + Send> FileWriterExt for AsyncPutMultipart<W> {
+ fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
+ let location = self.location.clone();
+ let multipart_id = self.multipart_id.clone();
+ let store = self.store.clone();
+ Ok(Box::pin(async move {
+ store
+ .abort_multipart(&location, &multipart_id)
+ .await
+ .map_err(DataFusionError::ObjectStore)
+ }))
+ }
+}
+
+/// A wrapper around an `AsyncWrite` type that provides append functionality.
+pub struct AsyncAppend<W: AsyncWrite + Unpin + Send> {
Review Comment:
I raise error in the append mode now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]