alamb commented on code in PR #6526:
URL: https://github.com/apache/arrow-datafusion/pull/6526#discussion_r1219418581


##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -205,136 +205,74 @@ impl AsyncWrite for AsyncPutWriter {
     }
 }
 
-/// An extension trait for `AsyncWrite` types that adds an `abort_writer` 
method.
-pub trait FileWriterExt: AsyncWrite + Unpin + Send {
-    /// Aborts the writer and returns a boxed future with the result.
-    /// The default implementation returns an immediately resolved future with 
`Ok(())`.
-    fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
-        Err(DataFusionError::Execution(
-            "Abort handling is not implemented".to_string(),
-        ))
-    }
-}
-
-/// A simple wrapper around an `AsyncWrite` type that implements 
`FileWriterExt`.
-pub struct AsyncPut<W: AsyncWrite + Unpin + Send> {
-    writer: W,
-}
-
-impl<W: AsyncWrite + Unpin + Send> AsyncPut<W> {
-    /// Create a new `AsyncPut` instance with the given writer.
-    pub fn new(writer: W) -> Self {
-        Self { writer }
-    }
-}
-
-impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AsyncPut<W> {
-    fn poll_write(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-        buf: &[u8],
-    ) -> Poll<std::result::Result<usize, Error>> {
-        Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
-    }
-
-    fn poll_flush(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<std::result::Result<(), Error>> {
-        Pin::new(&mut self.get_mut().writer).poll_flush(cx)
-    }
-
-    fn poll_shutdown(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<std::result::Result<(), Error>> {
-        Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
-    }
-}
-
-impl<W: AsyncWrite + Unpin + Send> FileWriterExt for AsyncPut<W> {
-    fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
-        Ok(async { Ok(()) }.boxed())
-    }
-}
-
-/// A wrapper around an `AsyncWrite` type that provides multipart upload 
functionality.
-pub struct AsyncPutMultipart<W: AsyncWrite + Unpin + Send> {
-    writer: W,
+/// Stores data needed during abortion of MultiPart writers
+pub(crate) struct MultiPart {
     /// A shared reference to the object store
     store: Arc<dyn ObjectStore>,
     multipart_id: MultipartId,
     location: Path,
 }
 
-impl<W: AsyncWrite + Unpin + Send> AsyncPutMultipart<W> {
-    /// Create a new `AsyncPutMultipart` instance with the given writer, 
object store, multipart ID, and location.
+impl MultiPart {
+    /// Create a new `MultiPart`
     pub fn new(
-        writer: W,
         store: Arc<dyn ObjectStore>,
         multipart_id: MultipartId,
         location: Path,
     ) -> Self {
         Self {
-            writer,
             store,
             multipart_id,
             location,
         }
     }
 }
 
-impl<W: AsyncWrite + Unpin + Send> AsyncWrite for AsyncPutMultipart<W> {
-    fn poll_write(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-        buf: &[u8],
-    ) -> Poll<std::result::Result<usize, Error>> {
-        Pin::new(&mut self.get_mut().writer).poll_write(cx, buf)
-    }
-
-    fn poll_flush(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<std::result::Result<(), Error>> {
-        Pin::new(&mut self.get_mut().writer).poll_flush(cx)
-    }
-
-    fn poll_shutdown(
-        self: Pin<&mut Self>,
-        cx: &mut Context<'_>,
-    ) -> Poll<std::result::Result<(), Error>> {
-        Pin::new(&mut self.get_mut().writer).poll_shutdown(cx)
-    }
-}
-
-impl<W: AsyncWrite + Unpin + Send> FileWriterExt for AsyncPutMultipart<W> {
-    fn abort_writer(&self) -> Result<BoxFuture<'static, Result<()>>> {
-        let location = self.location.clone();
-        let multipart_id = self.multipart_id.clone();
-        let store = self.store.clone();
-        Ok(Box::pin(async move {
-            store
-                .abort_multipart(&location, &multipart_id)
-                .await
-                .map_err(DataFusionError::ObjectStore)
-        }))
-    }
+pub(crate) enum AbortMode {
+    Put,
+    Append,
+    MultiPart(MultiPart),
 }
 
-/// A wrapper around an `AsyncWrite` type that provides append functionality.
-pub struct AsyncAppend<W: AsyncWrite + Unpin + Send> {
+/// A wrapper struct with abort method and writer
+struct AbortableWrite<W: AsyncWrite + Unpin + Send> {

Review Comment:
   👍  I think this construction makes the intent of the code much clearer. 
Thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to