This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/stream-based-write by this
push:
new f614704bb Fix kv
f614704bb is described below
commit f614704bb2af202e211c2a0d05cd45c07a78d6c8
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 19:33:03 2023 +0800
Fix kv
Signed-off-by: Xuanwo <[email protected]>
---
core/src/raw/adapters/kv/backend.rs | 24 +++++++++++++++---------
core/src/raw/adapters/typed_kv/backend.rs | 12 ++++++------
2 files changed, 21 insertions(+), 15 deletions(-)
diff --git a/core/src/raw/adapters/kv/backend.rs
b/core/src/raw/adapters/kv/backend.rs
index 8ed445db6..51da4f0de 100644
--- a/core/src/raw/adapters/kv/backend.rs
+++ b/core/src/raw/adapters/kv/backend.rs
@@ -21,6 +21,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use super::Adapter;
+use crate::raw::oio::StreamExt;
use crate::raw::*;
use crate::*;
@@ -389,18 +390,23 @@ impl<S> KvWriter<S> {
#[async_trait]
impl<S: Adapter> oio::Write for KvWriter<S> {
- async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
+ async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+ let bs = s.collect().await?;
+ self.buf = match self.buf.take() {
+ Some(mut v) => {
+ v.extend_from_slice(&bs);
+ Some(v)
+ }
+ None => Some(bs.into()),
+ };
+
+ Ok(())
}
async fn abort(&mut self) -> Result<()> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "output writer doesn't support abort",
- ))
+ self.buf = None;
+
+ Ok(())
}
async fn close(&mut self) -> Result<()> {
diff --git a/core/src/raw/adapters/typed_kv/backend.rs
b/core/src/raw/adapters/typed_kv/backend.rs
index fad9373a7..349cefbed 100644
--- a/core/src/raw/adapters/typed_kv/backend.rs
+++ b/core/src/raw/adapters/typed_kv/backend.rs
@@ -22,7 +22,7 @@ use bytes::Bytes;
use super::Adapter;
use super::Value;
-use crate::raw::oio::VectorCursor;
+use crate::raw::oio::{StreamExt, VectorCursor};
use crate::raw::*;
use crate::*;
@@ -402,11 +402,11 @@ impl<S> KvWriter<S> {
#[async_trait]
impl<S: Adapter> oio::Write for KvWriter<S> {
- async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
- Err(Error::new(
- ErrorKind::Unsupported,
- "Write::sink is not supported",
- ))
+ // TODO: we should avoid the copy here.
+ async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+ self.buf.push(s.collect().await?);
+
+ Ok(())
}
async fn abort(&mut self) -> Result<()> {