ozankabak commented on code in PR #6526:
URL: https://github.com/apache/arrow-datafusion/pull/6526#discussion_r1217420826
##########
datafusion/core/src/datasource/file_format/mod.rs:
##########
@@ -87,6 +98,277 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
conf: FileScanConfig,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn ExecutionPlan>>;
+
+ /// Take a list of files and the configuration to convert it to the
+ /// appropriate writer executor according to this file format.
+ async fn create_writer_physical_plan(
+ &self,
+ _input: Arc<dyn ExecutionPlan>,
+ _state: &SessionState,
+ _conf: FileSinkConfig,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let msg = "Writer not implemented for this format".to_owned();
+ Err(DataFusionError::NotImplemented(msg))
+ }
+}
+
+/// `AsyncPutWriter` is an object that facilitates asynchronous writing to
object stores.
+/// It is specifically designed for the `object_store` crate's `put` method
and sends
+/// whole bytes at once when the buffer is flushed.
+pub struct AsyncPutWriter {
+ /// Object metadata
+ object_meta: ObjectMeta,
+ /// A shared reference to the object store
+ store: Arc<dyn ObjectStore>,
+ /// A buffer that stores the bytes to be sent
+ current_buffer: Vec<u8>,
+ /// Used for async handling in flush method
+ inner_state: AsyncPutState,
+}
+
+impl AsyncPutWriter {
+ /// Constructor for the `AsyncPutWriter` object
+ pub fn new(object_meta: ObjectMeta, store: Arc<dyn ObjectStore>) -> Self {
+ Self {
+ object_meta,
+ store,
+ current_buffer: vec![],
+ // The writer starts out in buffering mode
+ inner_state: AsyncPutState::Buffer,
+ }
+ }
+
+ /// Separate implementation function that unpins the [`AsyncPutWriter`] so
+ /// that partial borrows work correctly
+ fn poll_shutdown_inner(
+ &mut self,
+ cx: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ loop {
+ match &mut self.inner_state {
+ AsyncPutState::Buffer => {
+ // Convert the current buffer to bytes and take ownership
of it
+ let bytes = Bytes::from(mem::take(&mut
self.current_buffer));
+ // Set the inner state to Put variant with the bytes
+ self.inner_state = AsyncPutState::Put { bytes }
+ }
+ AsyncPutState::Put { bytes } => {
+ // Send the bytes to the object store's put method
+ return Poll::Ready(
+ ready!(self
+ .store
+ .put(&self.object_meta.location, bytes.clone())
+ .poll_unpin(cx))
+ .map_err(Error::from),
+ );
+ }
+ }
+ }
+ }
+}
+
+/// An enum that represents the inner state of AsyncPut
+enum AsyncPutState {
+ /// Building Bytes struct in this state
+ Buffer,
+ /// Data in the buffer is being sent to the object store
+ Put { bytes: Bytes },
+}
+
+impl AsyncWrite for AsyncPutWriter {
+ // Define the implementation of the AsyncWrite trait for the
`AsyncPutWriter` struct
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<std::result::Result<usize, Error>> {
+ // Extend the current buffer with the incoming buffer
+ self.current_buffer.extend_from_slice(buf);
+ // Return a ready poll with the length of the incoming buffer
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ fn poll_flush(
+ self: Pin<&mut Self>,
+ _: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ // Return a ready poll with an empty result
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ // Call the poll_shutdown_inner method to handle the actual sending of
data to the object store
+ self.poll_shutdown_inner(cx)
+ }
+}
+
+/// A simple wrapper around an `AsyncWrite` type.
Review Comment:
Done, 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]