This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 21a45c79 fix: kv adapter's writer implementation fixed to honour empty 
writes (#1934)
21a45c79 is described below

commit 21a45c79fe4f78f4b9dcb76689b5c37fa440824f
Author: దామోదర <[email protected]>
AuthorDate: Thu Apr 13 15:50:38 2023 +0530

    fix: kv adapter's writer implementation fixed to honour empty writes (#1934)
---
 core/src/raw/adapters/kv/backend.rs | 28 +++++++++++++++++++---------
 1 file changed, 19 insertions(+), 9 deletions(-)

diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index 7e6ea81a..2395eb4d 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -253,7 +253,7 @@ pub struct KvWriter<S> {
     path: String,
 
     /// TODO: if kv supports append, we can use them directly.
-    buf: Vec<u8>,
+    buf: Option<Vec<u8>>,
 }
 
 impl<S> KvWriter<S> {
@@ -261,7 +261,15 @@ impl<S> KvWriter<S> {
         KvWriter {
             kv,
             path,
-            buf: Vec::new(),
+            buf: None,
+        }
+    }
+
+    fn extend_buf(&mut self, bs: Bytes) {
+        if let Some(buf) = self.buf.as_mut() {
+            buf.extend(bs);
+        } else {
+            self.buf = Some(bs.into())
         }
     }
 }
@@ -269,7 +277,7 @@ impl<S> KvWriter<S> {
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
     async fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.buf = bs.into();
+        self.buf = Some(bs.into());
 
         Ok(())
     }
@@ -277,7 +285,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
     async fn append(&mut self, bs: Bytes) -> Result<()> {
         if let Err(e) = self.kv.append(&self.path, 
bs.to_vec().as_slice()).await {
             if e.kind() == ErrorKind::Unsupported {
-                self.buf.extend(bs);
+                self.extend_buf(bs);
             } else {
                 return Err(e);
             }
@@ -286,8 +294,8 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
     }
 
     async fn close(&mut self) -> Result<()> {
-        if !self.buf.is_empty() {
-            self.kv.set(&self.path, &self.buf).await?;
+        if let Some(buf) = self.buf.as_deref() {
+            self.kv.set(&self.path, buf).await?;
         }
 
         Ok(())
@@ -296,19 +304,21 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
 
 impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
     fn write(&mut self, bs: Bytes) -> Result<()> {
-        self.buf = bs.into();
+        self.buf = Some(bs.into());
 
         Ok(())
     }
 
     fn append(&mut self, bs: Bytes) -> Result<()> {
-        self.buf.extend(bs);
+        self.extend_buf(bs);
 
         Ok(())
     }
 
     fn close(&mut self) -> Result<()> {
-        self.kv.blocking_set(&self.path, &self.buf)?;
+        if let Some(buf) = self.buf.as_deref() {
+            self.kv.blocking_set(&self.path, buf)?;
+        }
 
         Ok(())
     }

Reply via email to