This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/stream-based-write by this 
push:
     new f614704bb Fix kv
f614704bb is described below

commit f614704bb2af202e211c2a0d05cd45c07a78d6c8
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 19:33:03 2023 +0800

    Fix kv
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/raw/adapters/kv/backend.rs       | 24 +++++++++++++++---------
 core/src/raw/adapters/typed_kv/backend.rs | 12 ++++++------
 2 files changed, 21 insertions(+), 15 deletions(-)

diff --git a/core/src/raw/adapters/kv/backend.rs 
b/core/src/raw/adapters/kv/backend.rs
index 8ed445db6..51da4f0de 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -21,6 +21,7 @@ use async_trait::async_trait;
 use bytes::Bytes;
 
 use super::Adapter;
+use crate::raw::oio::StreamExt;
 use crate::raw::*;
 use crate::*;
 
@@ -389,18 +390,23 @@ impl<S> KvWriter<S> {
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        let bs = s.collect().await?;
+        self.buf = match self.buf.take() {
+            Some(mut v) => {
+                v.extend_from_slice(&bs);
+                Some(v)
+            }
+            None => Some(bs.into()),
+        };
+
+        Ok(())
     }
 
     async fn abort(&mut self) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "output writer doesn't support abort",
-        ))
+        self.buf = None;
+
+        Ok(())
     }
 
     async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/adapters/typed_kv/backend.rs 
b/core/src/raw/adapters/typed_kv/backend.rs
index fad9373a7..349cefbed 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -22,7 +22,7 @@ use bytes::Bytes;
 
 use super::Adapter;
 use super::Value;
-use crate::raw::oio::VectorCursor;
+use crate::raw::oio::{StreamExt, VectorCursor};
 use crate::raw::*;
 use crate::*;
 
@@ -402,11 +402,11 @@ impl<S> KvWriter<S> {
 
 #[async_trait]
 impl<S: Adapter> oio::Write for KvWriter<S> {
-    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
+    // TODO: we should avoid the copy here.
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        self.buf.push(s.collect().await?);
+
+        Ok(())
     }
 
     async fn abort(&mut self) -> Result<()> {

Reply via email to