This is an automated email from the ASF dual-hosted git repository. kingsword09 pushed a commit to branch remove-kv in repository https://gitbox.apache.org/repos/asf/opendal.git
commit f5aec51f6fd94ed20b0109dd8248a03952e7c108 Author: Kingsword <[email protected]> AuthorDate: Fri Oct 24 08:36:44 2025 +0800 refactor: Remove deprecated kv and typed_kv adapters --- core/src/raw/adapters/kv/api.rs | 165 ------------------ core/src/raw/adapters/kv/backend.rs | 253 --------------------------- core/src/raw/adapters/kv/mod.rs | 31 ---- core/src/raw/adapters/mod.rs | 50 ------ core/src/raw/adapters/typed_kv/api.rs | 168 ------------------ core/src/raw/adapters/typed_kv/backend.rs | 279 ------------------------------ core/src/raw/adapters/typed_kv/mod.rs | 29 ---- core/src/raw/mod.rs | 1 - 8 files changed, 976 deletions(-) diff --git a/core/src/raw/adapters/kv/api.rs b/core/src/raw/adapters/kv/api.rs deleted file mode 100644 index 61f3fa832..000000000 --- a/core/src/raw/adapters/kv/api.rs +++ /dev/null @@ -1,165 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::fmt::Debug; -use std::future::ready; -use std::ops::DerefMut; - -use futures::Future; - -use crate::Capability; -use crate::Scheme; -use crate::raw::*; -use crate::*; - -/// Scan is the async iterator returned by `Adapter::scan`. -pub trait Scan: Send + Sync + Unpin { - /// Fetch the next key in the current key prefix - /// - /// `Ok(None)` means no further key will be returned - fn next(&mut self) -> impl Future<Output = Result<Option<String>>> + MaybeSend; -} - -/// A noop implementation of Scan -impl Scan for () { - async fn next(&mut self) -> Result<Option<String>> { - Ok(None) - } -} - -/// A Scan implementation for all trivial non-async iterators -#[allow(dead_code)] -pub struct ScanStdIter<I>(I); - -#[cfg(any(feature = "services-rocksdb", feature = "services-sled"))] -impl<I> ScanStdIter<I> -where - I: Iterator<Item = Result<String>> + Unpin + Send + Sync, -{ - /// Create a new ScanStdIter from an Iterator - pub(crate) fn new(inner: I) -> Self { - Self(inner) - } -} - -impl<I> Scan for ScanStdIter<I> -where - I: Iterator<Item = Result<String>> + Unpin + Send + Sync, -{ - async fn next(&mut self) -> Result<Option<String>> { - self.0.next().transpose() - } -} - -/// A type-erased wrapper of Scan -pub type Scanner = Box<dyn ScanDyn>; - -pub trait ScanDyn: Unpin + Send + Sync { - fn next_dyn(&mut self) -> BoxedFuture<'_, Result<Option<String>>>; -} - -impl<T: Scan + ?Sized> ScanDyn for T { - fn next_dyn(&mut self) -> BoxedFuture<'_, Result<Option<String>>> { - Box::pin(self.next()) - } -} - -impl<T: ScanDyn + ?Sized> Scan for Box<T> { - async fn next(&mut self) -> Result<Option<String>> { - self.deref_mut().next_dyn().await - } -} - -/// KvAdapter is the adapter to underlying kv services. -/// -/// By implement this trait, any kv service can work as an OpenDAL Service. -pub trait Adapter: Send + Sync + Debug + Unpin + 'static { - /// TODO: use default associate type `= ()` after stabilized - type Scanner: Scan; - - /// Return the info of this key value accessor. - fn info(&self) -> Info; - - /// Get a key from service. - /// - /// - return `Ok(None)` if this key is not exist. - fn get(&self, path: &str) -> impl Future<Output = Result<Option<Buffer>>> + MaybeSend; - - /// Set a key into service. - fn set(&self, path: &str, value: Buffer) -> impl Future<Output = Result<()>> + MaybeSend; - - /// Delete a key from service. - /// - /// - return `Ok(())` even if this key is not exist. - fn delete(&self, path: &str) -> impl Future<Output = Result<()>> + MaybeSend; - - /// Scan a key prefix to get all keys that start with this key. - fn scan(&self, path: &str) -> impl Future<Output = Result<Self::Scanner>> + MaybeSend { - let _ = path; - - ready(Err(Error::new( - ErrorKind::Unsupported, - "kv adapter doesn't support this operation", - ) - .with_operation("kv::Adapter::scan"))) - } - - /// Append a key into service - fn append(&self, path: &str, value: &[u8]) -> impl Future<Output = Result<()>> + MaybeSend { - let _ = path; - let _ = value; - - ready(Err(Error::new( - ErrorKind::Unsupported, - "kv adapter doesn't support this operation", - ) - .with_operation("kv::Adapter::append"))) - } -} - -/// Info for this key value accessor. -pub struct Info { - scheme: Scheme, - name: String, - capabilities: Capability, -} - -impl Info { - /// Create a new KeyValueAccessorInfo. - pub fn new(scheme: Scheme, name: &str, capabilities: Capability) -> Self { - Self { - scheme, - name: name.to_string(), - capabilities, - } - } - - /// Get the scheme. - pub fn scheme(&self) -> Scheme { - self.scheme - } - - /// Get the name. - pub fn name(&self) -> &str { - &self.name - } - - /// Get the capabilities. - pub fn capabilities(&self) -> Capability { - self.capabilities - } -} diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs deleted file mode 100644 index dde5b1c50..000000000 --- a/core/src/raw/adapters/kv/backend.rs +++ /dev/null @@ -1,253 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use super::Adapter; -use super::Scan; -use crate::raw::oio::HierarchyLister; -use crate::raw::oio::QueueBuf; -use crate::raw::*; -use crate::*; - -/// Backend of kv service. If the storage service is one k-v-like service, it should implement this kv [`Backend`] by right. -/// -/// `Backend` implements one general logic on how to read, write, scan the data from one kv store efficiently. -/// And the [`Adapter`] held by `Backend` will handle how to communicate with one k-v-like service really and provides -/// a series of basic operation for this service. -/// -/// OpenDAL developer can implement one new k-v store backend easily with help of this Backend. -#[derive(Debug, Clone)] -pub struct Backend<S: Adapter> { - kv: Arc<S>, - root: String, - info: Arc<AccessorInfo>, -} - -impl<S> Backend<S> -where - S: Adapter, -{ - /// Create a new kv backend. - pub fn new(kv: S) -> Self { - let kv_info = kv.info(); - Self { - kv: Arc::new(kv), - root: "/".to_string(), - info: { - let am: AccessorInfo = AccessorInfo::default(); - am.set_root("/"); - am.set_scheme(kv_info.scheme().into_static()); - am.set_name(kv_info.name()); - - let mut cap = kv_info.capabilities(); - if cap.read { - cap.stat = true; - } - - if cap.write { - cap.write_can_empty = true; - cap.delete = true; - } - - if cap.list { - cap.list_with_recursive = true; - } - - am.set_native_capability(cap); - - am.into() - }, - } - } - - /// Configure root within this backend. - pub fn with_root(self, root: &str) -> Self { - self.with_normalized_root(normalize_root(root)) - } - - /// Configure root within this backend. - /// - /// This method assumes root is normalized. - pub(crate) fn with_normalized_root(mut self, root: String) -> Self { - let root = normalize_root(&root); - self.info.set_root(&root); - self.root = root; - self - } -} - -impl<S: Adapter> Access for Backend<S> { - type Reader = Buffer; - type Writer = KvWriter<S>; - type Lister = HierarchyLister<KvLister<S::Scanner>>; - type Deleter = oio::OneShotDeleter<KvDeleter<S>>; - - fn info(&self) -> Arc<AccessorInfo> { - self.info.clone() - } - - async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { - let p = build_abs_path(&self.root, path); - - if p == build_abs_path(&self.root, "") { - Ok(RpStat::new(Metadata::new(EntryMode::DIR))) - } else { - let bs = self.kv.get(&p).await?; - match bs { - Some(bs) => Ok(RpStat::new( - Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64), - )), - None => Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")), - } - } - } - - async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let p = build_abs_path(&self.root, path); - let bs = match self.kv.get(&p).await? { - Some(bs) => bs, - None => return Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")), - }; - Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize()))) - } - - async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let p = build_abs_path(&self.root, path); - - Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p))) - } - - async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { - Ok(( - RpDelete::default(), - oio::OneShotDeleter::new(KvDeleter::new(self.kv.clone(), self.root.clone())), - )) - } - - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let p = build_abs_path(&self.root, path); - let res = self.kv.scan(&p).await?; - let lister = KvLister::new(&self.root, res); - let lister = HierarchyLister::new(lister, path, args.recursive()); - - Ok((RpList::default(), lister)) - } -} - -pub struct KvLister<Iter> { - root: String, - inner: Iter, -} - -impl<Iter> KvLister<Iter> -where - Iter: Scan, -{ - fn new(root: &str, inner: Iter) -> Self { - Self { - root: root.to_string(), - inner, - } - } - - async fn inner_next(&mut self) -> Result<Option<oio::Entry>> { - Ok(self.inner.next().await?.map(|v| { - let mode = if v.ends_with('/') { - EntryMode::DIR - } else { - EntryMode::FILE - }; - let mut path = build_rel_path(&self.root, &v); - if path.is_empty() { - path = "/".to_string(); - } - oio::Entry::new(&path, Metadata::new(mode)) - })) - } -} - -impl<Iter> oio::List for KvLister<Iter> -where - Iter: Scan, -{ - async fn next(&mut self) -> Result<Option<oio::Entry>> { - self.inner_next().await - } -} - -pub struct KvWriter<S> { - kv: Arc<S>, - path: String, - buffer: QueueBuf, -} - -impl<S> KvWriter<S> { - fn new(kv: Arc<S>, path: String) -> Self { - KvWriter { - kv, - path, - buffer: QueueBuf::new(), - } - } -} - -/// # Safety -/// -/// We will only take `&mut Self` reference for KvWriter. -unsafe impl<S: Adapter> Sync for KvWriter<S> {} - -impl<S: Adapter> oio::Write for KvWriter<S> { - async fn write(&mut self, bs: Buffer) -> Result<()> { - self.buffer.push(bs); - Ok(()) - } - - async fn close(&mut self) -> Result<Metadata> { - let buf = self.buffer.clone().collect(); - let length = buf.len() as u64; - self.kv.set(&self.path, buf).await?; - - let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); - Ok(meta) - } - - async fn abort(&mut self) -> Result<()> { - self.buffer.clear(); - Ok(()) - } -} - -pub struct KvDeleter<S> { - kv: Arc<S>, - root: String, -} - -impl<S> KvDeleter<S> { - fn new(kv: Arc<S>, root: String) -> Self { - KvDeleter { kv, root } - } -} - -impl<S: Adapter> oio::OneShotDelete for KvDeleter<S> { - async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { - let p = build_abs_path(&self.root, &path); - - self.kv.delete(&p).await?; - Ok(()) - } -} diff --git a/core/src/raw/adapters/kv/mod.rs b/core/src/raw/adapters/kv/mod.rs deleted file mode 100644 index d1f55b179..000000000 --- a/core/src/raw/adapters/kv/mod.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Providing Key Value Adapter for OpenDAL. -//! -//! Any services that implement `Adapter` can be used an OpenDAL Service. - -mod api; -pub use api::Adapter; -pub use api::Info; -pub use api::Scan; -#[cfg(any(feature = "services-rocksdb", feature = "services-sled"))] -pub(crate) use api::ScanStdIter; -pub use api::Scanner; - -mod backend; -pub use backend::Backend; diff --git a/core/src/raw/adapters/mod.rs b/core/src/raw/adapters/mod.rs deleted file mode 100644 index c5a631346..000000000 --- a/core/src/raw/adapters/mod.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Providing adapters and its implementations. -//! -//! Adapters in OpenDAL means services that shares similar behaviors. We use -//! adapter to make those services been implemented more easily. For example, -//! with [`kv::Adapter`], users only need to implement `get`, `set` for a service. -//! -//! # Notes -//! -//! Please import the module instead of its type. -//! -//! For example, use the following: -//! -//! ```ignore -//! use opendal::adapters::kv; -//! -//! impl kv::Adapter for MyType {} -//! ``` -//! -//! Instead of: -//! -//! ```ignore -//! use opendal::adapters::kv::Adapter; -//! -//! impl Adapter for MyType {} -//! ``` -//! -//! # Available Adapters -//! -//! - [`kv::Adapter`]: Adapter for Key Value Services like `redis`. -//! - [`typed_kv::Adapter`]: Adapter key value services that in-memory. - -pub mod kv; -pub mod typed_kv; diff --git a/core/src/raw/adapters/typed_kv/api.rs b/core/src/raw/adapters/typed_kv/api.rs deleted file mode 100644 index ab1e63d87..000000000 --- a/core/src/raw/adapters/typed_kv/api.rs +++ /dev/null @@ -1,168 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use crate::Buffer; -use crate::EntryMode; -use crate::Error; -use crate::ErrorKind; -use crate::Metadata; -use crate::Result; -use crate::Scheme; -use crate::raw::{MaybeSend, Timestamp}; -use std::fmt::Debug; -use std::future::Future; -use std::future::ready; -use std::mem::size_of; - -/// Adapter is the typed adapter to underlying kv services. -/// -/// By implement this trait, any kv service can work as an OpenDAL Service. -/// -/// # Notes -/// -/// `typed_kv::Adapter` is the typed version of `kv::Adapter`. It's more -/// efficient if the underlying kv service can store data with its type. For -/// example, we can store `Bytes` along with its metadata so that we don't -/// need to serialize/deserialize it when we get it from the service. -/// -/// Ideally, we should use `typed_kv::Adapter` instead of `kv::Adapter` for -/// in-memory rust libs like moka and dashmap. -pub trait Adapter: Send + Sync + Debug + Unpin + 'static { - /// Return the info of this key value accessor. - fn info(&self) -> Info; - - /// Get a value from adapter. - fn get(&self, path: &str) -> impl Future<Output = Result<Option<Value>>> + MaybeSend; - - /// Set a value into adapter. - fn set(&self, path: &str, value: Value) -> impl Future<Output = Result<()>> + MaybeSend; - - /// Delete a value from adapter. - fn delete(&self, path: &str) -> impl Future<Output = Result<()>> + MaybeSend; - - /// Scan a key prefix to get all keys that start with this key. - fn scan(&self, path: &str) -> impl Future<Output = Result<Vec<String>>> + MaybeSend { - let _ = path; - - ready(Err(Error::new( - ErrorKind::Unsupported, - "typed_kv adapter doesn't support this operation", - ) - .with_operation("typed_kv::Adapter::scan"))) - } -} - -/// Value is the typed value stored in adapter. -/// -/// It's cheap to clone so that users can read data without extra copy. -#[derive(Debug, Clone)] -pub struct Value { - /// Metadata of this value. - pub metadata: Metadata, - /// The corresponding content of this value. - pub value: Buffer, -} - -impl Value { - /// Create a new dir of value. - pub fn new_dir() -> Self { - Self { - metadata: Metadata::new(EntryMode::DIR) - .with_content_length(0) - .with_last_modified(Timestamp::now()), - value: Buffer::new(), - } - } - - /// Size returns the in-memory size of Value. - pub fn size(&self) -> usize { - size_of::<Metadata>() + self.value.len() - } -} - -/// Capability is used to describe what operations are supported -/// by Typed KV Operator. -#[derive(Copy, Clone, Default)] -pub struct Capability { - /// If typed_kv operator supports get natively. - pub get: bool, - /// If typed_kv operator supports set natively. - pub set: bool, - /// If typed_kv operator supports delete natively. - pub delete: bool, - /// If typed_kv operator supports scan natively. - pub scan: bool, - /// If typed_kv operator supports shared access. - pub shared: bool, -} - -impl Debug for Capability { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut s = vec![]; - - if self.get { - s.push("Get") - } - if self.set { - s.push("Set"); - } - if self.delete { - s.push("Delete"); - } - if self.scan { - s.push("Scan"); - } - if self.shared { - s.push("Shared"); - } - - write!(f, "{{ {} }}", s.join(" | ")) - } -} - -/// Info for this key value accessor. -pub struct Info { - scheme: Scheme, - name: String, - capabilities: Capability, -} - -impl Info { - /// Create a new KeyValueAccessorInfo. - pub fn new(scheme: Scheme, name: &str, capabilities: Capability) -> Self { - Self { - scheme, - name: name.to_string(), - capabilities, - } - } - - /// Get the scheme. - pub fn scheme(&self) -> Scheme { - self.scheme - } - - /// Get the name. - pub fn name(&self) -> &str { - &self.name - } - - /// Get the capabilities. - pub fn capabilities(&self) -> Capability { - self.capabilities - } -} diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs deleted file mode 100644 index 70aabdbe7..000000000 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ /dev/null @@ -1,279 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; -use std::vec::IntoIter; - -use super::Adapter; -use super::Value; -use crate::raw::oio::HierarchyLister; -use crate::raw::oio::QueueBuf; -use crate::raw::*; -use crate::*; - -/// The typed kv backend which implements Accessor for typed kv adapter. -#[derive(Debug, Clone)] -pub struct Backend<S: Adapter> { - kv: Arc<S>, - root: String, - info: Arc<AccessorInfo>, -} - -impl<S> Backend<S> -where - S: Adapter, -{ - /// Create a new kv backend. - pub fn new(kv: S) -> Self { - let kv_info = kv.info(); - Self { - kv: Arc::new(kv), - root: "/".to_string(), - info: { - let am: AccessorInfo = AccessorInfo::default(); - am.set_root("/"); - am.set_scheme(kv_info.scheme().into_static()); - am.set_name(kv_info.name()); - - let kv_cap = kv_info.capabilities(); - let mut cap = Capability::default(); - if kv_cap.get { - cap.read = true; - cap.stat = true; - } - - if kv_cap.set { - cap.write = true; - cap.write_can_empty = true; - } - - if kv_cap.delete { - cap.delete = true; - } - - if kv_cap.scan { - cap.list = true; - cap.list_with_recursive = true; - } - - if kv_cap.shared { - cap.shared = true; - } - - am.set_native_capability(cap); - - am.into() - }, - } - } - - /// Configure root within this backend. - pub fn with_root(mut self, root: &str) -> Self { - let root = normalize_root(root); - self.info.set_root(&root); - self.root = root; - self - } -} - -impl<S: Adapter> Access for Backend<S> { - type Reader = Buffer; - type Writer = KvWriter<S>; - type Lister = HierarchyLister<KvLister>; - type Deleter = oio::OneShotDeleter<KvDeleter<S>>; - - fn info(&self) -> Arc<AccessorInfo> { - self.info.clone() - } - - async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { - let p = build_abs_path(&self.root, path); - - if p == build_abs_path(&self.root, "") { - Ok(RpStat::new(Metadata::new(EntryMode::DIR))) - } else { - let bs = self.kv.get(&p).await?; - match bs { - Some(bs) => Ok(RpStat::new(bs.metadata)), - None => Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")), - } - } - } - - async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let p = build_abs_path(&self.root, path); - - let bs = match self.kv.get(&p).await? { - // TODO: we can reuse the metadata in value to build content range. - Some(bs) => bs.value, - None => return Err(Error::new(ErrorKind::NotFound, "kv doesn't have this path")), - }; - - Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize()))) - } - - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let p = build_abs_path(&self.root, path); - - Ok((RpWrite::new(), KvWriter::new(self.kv.clone(), p, args))) - } - - async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { - Ok(( - RpDelete::default(), - oio::OneShotDeleter::new(KvDeleter::new(self.kv.clone(), self.root.clone())), - )) - } - - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let p = build_abs_path(&self.root, path); - let res = self.kv.scan(&p).await?; - let lister = KvLister::new(&self.root, res); - let lister = HierarchyLister::new(lister, path, args.recursive()); - - Ok((RpList::default(), lister)) - } -} - -pub struct KvLister { - root: String, - inner: IntoIter<String>, -} - -impl KvLister { - fn new(root: &str, inner: Vec<String>) -> Self { - Self { - root: root.to_string(), - inner: inner.into_iter(), - } - } - - fn inner_next(&mut self) -> Option<oio::Entry> { - self.inner.next().map(|v| { - let mode = if v.ends_with('/') { - EntryMode::DIR - } else { - EntryMode::FILE - }; - let mut path = build_rel_path(&self.root, &v); - if path.is_empty() { - path = "/".to_string(); - } - oio::Entry::new(&path, Metadata::new(mode)) - }) - } -} - -impl oio::List for KvLister { - async fn next(&mut self) -> Result<Option<oio::Entry>> { - Ok(self.inner_next()) - } -} - -pub struct KvWriter<S> { - kv: Arc<S>, - path: String, - - op: OpWrite, - buf: Option<QueueBuf>, - value: Option<Value>, -} - -/// # Safety -/// -/// We will only take `&mut Self` reference for KvWriter. -unsafe impl<S: Adapter> Sync for KvWriter<S> {} - -impl<S> KvWriter<S> { - fn new(kv: Arc<S>, path: String, op: OpWrite) -> Self { - KvWriter { - kv, - path, - op, - buf: None, - value: None, - } - } - - fn build(&mut self) -> Value { - let value = self.buf.take().map(QueueBuf::collect).unwrap_or_default(); - - let mut metadata = Metadata::new(EntryMode::FILE); - metadata.set_content_length(value.len() as u64); - - if let Some(v) = self.op.cache_control() { - metadata.set_cache_control(v); - } - if let Some(v) = self.op.content_disposition() { - metadata.set_content_disposition(v); - } - if let Some(v) = self.op.content_type() { - metadata.set_content_type(v); - } - - Value { metadata, value } - } -} - -impl<S: Adapter> oio::Write for KvWriter<S> { - async fn write(&mut self, bs: Buffer) -> Result<()> { - let mut buf = self.buf.take().unwrap_or_default(); - buf.push(bs); - self.buf = Some(buf); - Ok(()) - } - - async fn close(&mut self) -> Result<Metadata> { - let value = match &self.value { - Some(value) => value.clone(), - None => { - let value = self.build(); - self.value = Some(value.clone()); - value - } - }; - let meta = value.metadata.clone(); - self.kv.set(&self.path, value).await?; - - Ok(meta) - } - - async fn abort(&mut self) -> Result<()> { - self.buf = None; - Ok(()) - } -} - -pub struct KvDeleter<S> { - kv: Arc<S>, - root: String, -} - -impl<S> KvDeleter<S> { - fn new(kv: Arc<S>, root: String) -> Self { - KvDeleter { kv, root } - } -} - -impl<S: Adapter> oio::OneShotDelete for KvDeleter<S> { - async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { - let p = build_abs_path(&self.root, &path); - - self.kv.delete(&p).await?; - Ok(()) - } -} diff --git a/core/src/raw/adapters/typed_kv/mod.rs b/core/src/raw/adapters/typed_kv/mod.rs deleted file mode 100644 index 567ecddac..000000000 --- a/core/src/raw/adapters/typed_kv/mod.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Providing Typed Key Value Adapter for OpenDAL. -//! -//! Any services that implement `Adapter` can be used an OpenDAL Service. - -mod api; -pub use api::Adapter; -pub use api::Capability; -pub use api::Info; -pub use api::Value; - -mod backend; -pub use backend::Backend; diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs index 768026850..446cdb8eb 100644 --- a/core/src/raw/mod.rs +++ b/core/src/raw/mod.rs @@ -95,7 +95,6 @@ mod atomic_util; pub use atomic_util::*; // Expose as a pub mod to avoid confusing. -pub mod adapters; pub mod oio; #[cfg(feature = "tests")] pub mod tests;
