Xuanwo commented on code in PR #5803: URL: https://github.com/apache/opendal/pull/5803#discussion_r2000543794
########## core/src/services/compfs/writer.rs: ########## @@ -44,27 +48,35 @@ impl oio::Write for CompfsWriter { /// the write_all doesn't work correctly if `bs` is non-contiguous. /// /// The IoBuf::buf_len() only returns the length of the current buffer. - async fn write(&mut self, bs: Buffer) -> Result<()> { - let mut file = self.file.clone(); + async fn write(&mut self, mut bs: Buffer) -> Result<()> { + let mut file = self.file.clone().unwrap(); - self.core + let pos = self + .core .exec(move || async move { - buf_try!(@try file.write_all(bs).await); - Ok(()) + while bs.has_remaining() { Review Comment: `Buffer` implements `Iterator<Item=Bytes>`, we can use `for b in bs` here to avoid extra copy of data. ########## core/src/services/compfs/writer.rs: ########## @@ -44,27 +48,35 @@ impl oio::Write for CompfsWriter { /// the write_all doesn't work correctly if `bs` is non-contiguous. /// /// The IoBuf::buf_len() only returns the length of the current buffer. - async fn write(&mut self, bs: Buffer) -> Result<()> { - let mut file = self.file.clone(); + async fn write(&mut self, mut bs: Buffer) -> Result<()> { + let mut file = self.file.clone().unwrap(); - self.core + let pos = self + .core .exec(move || async move { - buf_try!(@try file.write_all(bs).await); - Ok(()) + while bs.has_remaining() { + let len = bs.chunk().len(); + let res; + BufResult(res, bs) = file.write_all(bs).await; + res?; + bs.advance(len); + } + Ok(file.position()) }) .await?; + self.file.as_mut().unwrap().set_position(pos); Ok(()) } async fn close(&mut self) -> Result<Metadata> { - let f = self.file.clone(); + let f = self.file.clone().unwrap(); self.core .exec(move || async move { f.get_ref().sync_all().await }) .await?; - let f = self.file.clone(); + let f = self.file.take().unwrap(); Review Comment: All functions of the writer might re-enter even after returning errors due to retries. It's better to use `if let Some(xx) = self.file.take()` instead. We can return errors like `file has closed` in such cases. ########## core/src/services/compfs/writer.rs: ########## @@ -73,6 +85,9 @@ impl oio::Write for CompfsWriter { } async fn abort(&mut self) -> Result<()> { - Ok(()) + Err(Error::new( + ErrorKind::Unsupported, + "cannot abort completion-based operations", Review Comment: The `abort` here does not have the same semantics as in io-uring. Leaving it as `Ok(())` is sufficient. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@opendal.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org