Xuanwo commented on code in PR #5431:
URL: https://github.com/apache/arrow-rs/pull/5431#discussion_r1503427492
##########
object_store/src/buffered.rs:
##########
@@ -205,6 +205,138 @@ impl AsyncBufRead for BufReader {
}
}
+/// An async buffered writer compatible with the tokio IO traits
+///
+/// Up to `capacity` bytes will be buffered in memory, and flushed on shutdown
+/// using [`ObjectStore::put`]. If `capacity` is exceeded, data will instead be
+/// streamed using [`ObjectStore::put_multipart`]
+pub struct BufWriter {
+ capacity: usize,
+ state: BufWriterState,
+ multipart_id: Option<MultipartId>,
+ store: Arc<dyn ObjectStore>,
+}
+
+impl std::fmt::Debug for BufWriter {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("BufWriter")
+ .field("capacity", &self.capacity)
+ .field("multipart_id", &self.multipart_id)
+ .finish()
+ }
+}
+
+type MultipartResult = (MultipartId, Box<dyn AsyncWrite + Send + Unpin>);
+
+enum BufWriterState {
+ /// Buffer up to capacity bytes
+ Buffer(Path, Vec<u8>),
+ /// [`ObjectStore::put_multipart`]
+ Prepare(BoxFuture<'static, std::io::Result<MultipartResult>>),
+ /// Write to a multipart upload
+ Write(Box<dyn AsyncWrite + Send + Unpin>),
+ /// [`ObjectStore::put`]
+ Put(BoxFuture<'static, std::io::Result<()>>),
+}
+
+impl BufWriter {
+ /// Create a new [`BufWriter`] from the provided [`ObjectStore`] and
[`Path`]
+ pub fn new(store: Arc<dyn ObjectStore>, path: Path) -> Self {
+ Self::with_capacity(store, path, 10 * 1024 * 1024)
+ }
+
+ /// Create a new [`BufWriter`] from the provided [`ObjectStore`], [`Path`]
and `capacity`
+ pub fn with_capacity(store: Arc<dyn ObjectStore>, path: Path, capacity:
usize) -> Self {
+ Self {
+ capacity,
+ store,
+ state: BufWriterState::Buffer(path, Vec::with_capacity(1024)),
+ multipart_id: None,
+ }
+ }
+
+ /// Returns the [`MultipartId`] if multipart upload
+ pub fn multipart_id(&self) -> Option<&MultipartId> {
+ self.multipart_id.as_ref()
+ }
+}
+
+impl AsyncWrite for BufWriter {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<Result<usize, Error>> {
+ let cap = self.capacity;
+ loop {
+ return match &mut self.state {
+ BufWriterState::Write(write) => Pin::new(write).poll_write(cx,
buf),
+ BufWriterState::Put(_) => panic!("Already shut down"),
+ BufWriterState::Prepare(f) => {
+ let (id, w) = ready!(f.poll_unpin(cx)?);
+ self.state = BufWriterState::Write(w);
+ self.multipart_id = Some(id);
+ continue;
+ }
+ BufWriterState::Buffer(path, b) => {
+ if b.len().saturating_add(buf.len()) >= cap {
+ let buffer = std::mem::take(b);
+ let path = std::mem::take(path);
+ let store = Arc::clone(&self.store);
+ self.state = BufWriterState::Prepare(Box::pin(async
move {
+ let (id, mut writer) =
store.put_multipart(&path).await?;
+ writer.write_all(&buffer).await?;
+ Ok((id, writer))
+ }));
+ continue;
+ }
+ b.extend_from_slice(buf);
+ Poll::Ready(Ok(buf.len()))
+ }
+ };
+ }
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Result<(), Error>> {
+ loop {
+ return match &mut self.state {
+ BufWriterState::Buffer(_, _) => Poll::Ready(Ok(())),
+ BufWriterState::Write(write) => Pin::new(write).poll_flush(cx),
+ BufWriterState::Put(_) => panic!("Already shut down"),
+ BufWriterState::Prepare(f) => {
+ let (id, w) = ready!(f.poll_unpin(cx)?);
+ self.state = BufWriterState::Write(w);
+ self.multipart_id = Some(id);
+ continue;
+ }
+ };
+ }
+ }
+
+ fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Result<(), Error>> {
+ loop {
+ match &mut self.state {
+ BufWriterState::Prepare(f) => {
+ let (id, w) = ready!(f.poll_unpin(cx)?);
+ self.state = BufWriterState::Write(w);
+ self.multipart_id = Some(id);
+ }
+ BufWriterState::Buffer(p, b) => {
+ let buf = std::mem::take(b);
Review Comment:
Apologies for the confusion caused by my choice of the word "safe". I'm
talking about this case:
- write some data in buffer.
- calling `poll_shutdown`, return error like `500 Internal Server`.
- calling `poll_shutdown` again for retrying.
But the `buf` has been take in the first call, so the final content could be
wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]