Ji-Xinyou commented on code in PR #2987: URL: https://github.com/apache/incubator-opendal/pull/2987#discussion_r1311804064
########## core/src/raw/oio/write/bounded_buf_write.rs: ########## @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::min; + +use async_trait::async_trait; +use bytes::Bytes; + +use crate::raw::*; +use crate::*; + +/// BoundedBufWriter is used to implement [`oio::Write`] based on bounded buffer strategy: flush the +/// underlying storage when the buffered size is between the range of [min_buffer..max_buffer] +pub struct BoundedBufWriter<W: oio::Write> { + inner: W, + + min_buffer: usize, + max_buffer: usize, + buffer: oio::ChunkedCursor, +} + +impl<W: oio::Write> BoundedBufWriter<W> { + /// Create a new exact buf writer. + pub fn new(inner: W, min_buffer: usize) -> Self { + Self { + inner, + min_buffer, + max_buffer: usize::MAX, + buffer: oio::ChunkedCursor::new(), + } + } + + /// Configure the max buffer size for writer. + /// + /// # Panics + /// + /// Panic if max_buffer is smaller than min_buffer. + pub fn with_max_buffer(mut self, max_buffer: usize) -> Self { + assert!( + max_buffer >= self.min_buffer, + "input max buffer is smaller than min buffer" + ); + + self.max_buffer = max_buffer; + self + } +} + +#[async_trait] +impl<W: oio::Write> oio::Write for BoundedBufWriter<W> { + /// # TODO + /// + /// - Use copy_from_slice if given bytes is smaller than 4KiB. + async fn write(&mut self, bs: Bytes) -> Result<()> { + // Make sure the existing buffer has been flushed. + while self.buffer.len() >= self.max_buffer { + let mut buf = self.buffer.clone(); + let to_write = buf.split_to(self.max_buffer); + + if let Some(bs) = to_write.try_single() { Review Comment: I am curious why we are adding this separate branch for the case we have only 1 byte? ########## core/src/raw/oio/write/bounded_buf_write.rs: ########## @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::min; + +use async_trait::async_trait; +use bytes::Bytes; + +use crate::raw::*; +use crate::*; + +/// BoundedBufWriter is used to implement [`oio::Write`] based on bounded buffer strategy: flush the +/// underlying storage when the buffered size is between the range of [min_buffer..max_buffer] +pub struct BoundedBufWriter<W: oio::Write> { + inner: W, + + min_buffer: usize, Review Comment: I would say name this `min_buffer_size` ########## core/src/raw/oio/write/bounded_buf_write.rs: ########## @@ -0,0 +1,254 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cmp::min; + +use async_trait::async_trait; +use bytes::Bytes; + +use crate::raw::*; +use crate::*; + +/// BoundedBufWriter is used to implement [`oio::Write`] based on bounded buffer strategy: flush the +/// underlying storage when the buffered size is between the range of [min_buffer..max_buffer] +pub struct BoundedBufWriter<W: oio::Write> { + inner: W, + + min_buffer: usize, + max_buffer: usize, + buffer: oio::ChunkedCursor, +} + +impl<W: oio::Write> BoundedBufWriter<W> { + /// Create a new exact buf writer. + pub fn new(inner: W, min_buffer: usize) -> Self { + Self { + inner, + min_buffer, + max_buffer: usize::MAX, + buffer: oio::ChunkedCursor::new(), + } + } + + /// Configure the max buffer size for writer. + /// + /// # Panics + /// + /// Panic if max_buffer is smaller than min_buffer. + pub fn with_max_buffer(mut self, max_buffer: usize) -> Self { + assert!( + max_buffer >= self.min_buffer, + "input max buffer is smaller than min buffer" + ); + + self.max_buffer = max_buffer; + self + } +} + +#[async_trait] +impl<W: oio::Write> oio::Write for BoundedBufWriter<W> { + /// # TODO + /// + /// - Use copy_from_slice if given bytes is smaller than 4KiB. + async fn write(&mut self, bs: Bytes) -> Result<()> { Review Comment: For this function, I understand this as 1. if buffer is already `>= self.max_buffer`, we flush the "overflowing" part 2. if buffer + bs is `>= self.min_buffer`m flush the extra part. I wonder why don't we just flush the entire buffer until the buffer size meets `self.min_buffer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
