This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 21a45c79 fix: kv adapter's writer implementation fixed to honour empty
writes (#1934)
21a45c79 is described below
commit 21a45c79fe4f78f4b9dcb76689b5c37fa440824f
Author: దామోదర <[email protected]>
AuthorDate: Thu Apr 13 15:50:38 2023 +0530
fix: kv adapter's writer implementation fixed to honour empty writes (#1934)
---
core/src/raw/adapters/kv/backend.rs | 28 +++++++++++++++++++---------
1 file changed, 19 insertions(+), 9 deletions(-)
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index 7e6ea81a..2395eb4d 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -253,7 +253,7 @@ pub struct KvWriter<S> {
path: String,
/// TODO: if kv supports append, we can use them directly.
- buf: Vec<u8>,
+ buf: Option<Vec<u8>>,
}
impl<S> KvWriter<S> {
@@ -261,7 +261,15 @@ impl<S> KvWriter<S> {
KvWriter {
kv,
path,
- buf: Vec::new(),
+ buf: None,
+ }
+ }
+
+ fn extend_buf(&mut self, bs: Bytes) {
+ if let Some(buf) = self.buf.as_mut() {
+ buf.extend(bs);
+ } else {
+ self.buf = Some(bs.into())
}
}
}
@@ -269,7 +277,7 @@ impl<S> KvWriter<S> {
#[async_trait]
impl<S: Adapter> oio::Write for KvWriter<S> {
async fn write(&mut self, bs: Bytes) -> Result<()> {
- self.buf = bs.into();
+ self.buf = Some(bs.into());
Ok(())
}
@@ -277,7 +285,7 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
async fn append(&mut self, bs: Bytes) -> Result<()> {
if let Err(e) = self.kv.append(&self.path,
bs.to_vec().as_slice()).await {
if e.kind() == ErrorKind::Unsupported {
- self.buf.extend(bs);
+ self.extend_buf(bs);
} else {
return Err(e);
}
@@ -286,8 +294,8 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}
async fn close(&mut self) -> Result<()> {
- if !self.buf.is_empty() {
- self.kv.set(&self.path, &self.buf).await?;
+ if let Some(buf) = self.buf.as_deref() {
+ self.kv.set(&self.path, buf).await?;
}
Ok(())
@@ -296,19 +304,21 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
impl<S: Adapter> oio::BlockingWrite for KvWriter<S> {
fn write(&mut self, bs: Bytes) -> Result<()> {
- self.buf = bs.into();
+ self.buf = Some(bs.into());
Ok(())
}
fn append(&mut self, bs: Bytes) -> Result<()> {
- self.buf.extend(bs);
+ self.extend_buf(bs);
Ok(())
}
fn close(&mut self) -> Result<()> {
- self.kv.blocking_set(&self.path, &self.buf)?;
+ if let Some(buf) = self.buf.as_deref() {
+ self.kv.blocking_set(&self.path, buf)?;
+ }
Ok(())
}