Xuanwo commented on code in PR #5803:
URL: https://github.com/apache/opendal/pull/5803#discussion_r2000543794


##########
core/src/services/compfs/writer.rs:
##########
@@ -44,27 +48,35 @@ impl oio::Write for CompfsWriter {
     /// the write_all doesn't work correctly if `bs` is non-contiguous.
     ///
     /// The IoBuf::buf_len() only returns the length of the current buffer.
-    async fn write(&mut self, bs: Buffer) -> Result<()> {
-        let mut file = self.file.clone();
+    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
+        let mut file = self.file.clone().unwrap();
 
-        self.core
+        let pos = self
+            .core
             .exec(move || async move {
-                buf_try!(@try file.write_all(bs).await);
-                Ok(())
+                while bs.has_remaining() {

Review Comment:
   `Buffer` implements `Iterator<Item=Bytes>`, we can use `for b in bs` here to 
avoid extra copy of data.



##########
core/src/services/compfs/writer.rs:
##########
@@ -44,27 +48,35 @@ impl oio::Write for CompfsWriter {
     /// the write_all doesn't work correctly if `bs` is non-contiguous.
     ///
     /// The IoBuf::buf_len() only returns the length of the current buffer.
-    async fn write(&mut self, bs: Buffer) -> Result<()> {
-        let mut file = self.file.clone();
+    async fn write(&mut self, mut bs: Buffer) -> Result<()> {
+        let mut file = self.file.clone().unwrap();
 
-        self.core
+        let pos = self
+            .core
             .exec(move || async move {
-                buf_try!(@try file.write_all(bs).await);
-                Ok(())
+                while bs.has_remaining() {
+                    let len = bs.chunk().len();
+                    let res;
+                    BufResult(res, bs) = file.write_all(bs).await;
+                    res?;
+                    bs.advance(len);
+                }
+                Ok(file.position())
             })
             .await?;
+        self.file.as_mut().unwrap().set_position(pos);
 
         Ok(())
     }
 
     async fn close(&mut self) -> Result<Metadata> {
-        let f = self.file.clone();
+        let f = self.file.clone().unwrap();
 
         self.core
             .exec(move || async move { f.get_ref().sync_all().await })
             .await?;
 
-        let f = self.file.clone();
+        let f = self.file.take().unwrap();

Review Comment:
   All functions of the writer might re-enter even after returning errors due 
to retries. 
   
   It's better to use `if let Some(xx) = self.file.take()` instead. We can 
return errors like `file has closed` in such cases.



##########
core/src/services/compfs/writer.rs:
##########
@@ -73,6 +85,9 @@ impl oio::Write for CompfsWriter {
     }
 
     async fn abort(&mut self) -> Result<()> {
-        Ok(())
+        Err(Error::new(
+            ErrorKind::Unsupported,
+            "cannot abort completion-based operations",

Review Comment:
   The `abort` here does not have the same semantics as in io-uring. Leaving it 
as `Ok(())` is sufficient.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@opendal.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to