This is an automated email from the ASF dual-hosted git repository. koushiro pushed a commit to branch refactor-postgresql in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 333ed42edeb25a67e30cee212541bc24a3e6814a Author: koushiro <[email protected]> AuthorDate: Tue Oct 21 17:23:43 2025 +0800 refactor: migrate postgresql service from adapter::kv to impl Access directly --- core/src/services/postgresql/backend.rs | 172 +++++++++++---------- core/src/services/postgresql/config.rs | 4 +- core/src/services/postgresql/core.rs | 83 ++++++++++ .../src/services/postgresql/{mod.rs => deleter.rs} | 28 +++- core/src/services/postgresql/docs.md | 5 +- core/src/services/postgresql/mod.rs | 4 + core/src/services/postgresql/writer.rs | 59 +++++++ 7 files changed, 262 insertions(+), 93 deletions(-) diff --git a/core/src/services/postgresql/backend.rs b/core/src/services/postgresql/backend.rs index 8d5332d84..0e53f5579 100644 --- a/core/src/services/postgresql/backend.rs +++ b/core/src/services/postgresql/backend.rs @@ -17,15 +17,16 @@ use std::fmt::Debug; use std::fmt::Formatter; -use std::str::FromStr; +use std::sync::Arc; -use sqlx::PgPool; use sqlx::postgres::PgConnectOptions; use tokio::sync::OnceCell; -use crate::raw::adapters::kv; +use super::config::PostgresqlConfig; +use super::core::*; +use super::deleter::PostgresqlDeleter; +use super::writer::PostgresqlWriter; use crate::raw::*; -use crate::services::PostgresqlConfig; use crate::*; /// [PostgreSQL](https://www.postgresql.org/) services support. @@ -118,7 +119,7 @@ impl Builder for PostgresqlBuilder { } }; - let config = PgConnectOptions::from_str(&conn).map_err(|err| { + let config = conn.parse::<PgConnectOptions>().map_err(|err| { Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid") .with_context("service", Scheme::Postgresql) .set_source(err) @@ -141,7 +142,7 @@ impl Builder for PostgresqlBuilder { let root = normalize_root(self.config.root.unwrap_or_else(|| "/".to_string()).as_str()); - Ok(PostgresqlBackend::new(Adapter { + Ok(PostgresqlBackend::new(PostgresqlCore { pool: OnceCell::new(), config, table, @@ -153,99 +154,100 @@ impl Builder for PostgresqlBuilder { } /// Backend for Postgresql service -pub type PostgresqlBackend = kv::Backend<Adapter>; - -#[derive(Debug, Clone)] -pub struct Adapter { - pool: OnceCell<PgPool>, - config: PgConnectOptions, - - table: String, - key_field: String, - value_field: String, +#[derive(Clone, Debug)] +pub struct PostgresqlBackend { + core: Arc<PostgresqlCore>, + root: String, + info: Arc<AccessorInfo>, } -impl Adapter { - async fn get_client(&self) -> Result<&PgPool> { - self.pool - .get_or_try_init(|| async { - let pool = PgPool::connect_with(self.config.clone()) - .await - .map_err(parse_postgres_error)?; - Ok(pool) - }) - .await +impl PostgresqlBackend { + pub fn new(core: PostgresqlCore) -> Self { + let info = AccessorInfo::default(); + info.set_scheme(Scheme::Postgresql.into_static()); + info.set_name(&core.table); + info.set_root("/"); + info.set_native_capability(Capability { + read: true, + stat: true, + write: true, + write_can_empty: true, + delete: true, + shared: true, + ..Default::default() + }); + + Self { + core: Arc::new(core), + root: "/".to_string(), + info: Arc::new(info), + } } -} -impl kv::Adapter for Adapter { - type Scanner = (); - - fn info(&self) -> kv::Info { - kv::Info::new( - Scheme::Postgresql, - &self.table, - Capability { - read: true, - write: true, - shared: true, - ..Default::default() - }, - ) + fn with_normalized_root(mut self, root: String) -> Self { + self.info.set_root(&root); + self.root = root; + self } +} - async fn get(&self, path: &str) -> Result<Option<Buffer>> { - let pool = self.get_client().await?; - - let value: Option<Vec<u8>> = sqlx::query_scalar(&format!( - r#"SELECT "{}" FROM "{}" WHERE "{}" = $1 LIMIT 1"#, - self.value_field, self.table, self.key_field - )) - .bind(path) - .fetch_optional(pool) - .await - .map_err(parse_postgres_error)?; +impl Access for PostgresqlBackend { + type Reader = Buffer; + type Writer = PostgresqlWriter; + type Lister = (); + type Deleter = oio::OneShotDeleter<PostgresqlDeleter>; - Ok(value.map(Buffer::from)) + fn info(&self) -> Arc<AccessorInfo> { + self.info.clone() } - async fn set(&self, path: &str, value: Buffer) -> Result<()> { - let pool = self.get_client().await?; - - let table = &self.table; - let key_field = &self.key_field; - let value_field = &self.value_field; - sqlx::query(&format!( - r#"INSERT INTO "{table}" ("{key_field}", "{value_field}") - VALUES ($1, $2) - ON CONFLICT ("{key_field}") - DO UPDATE SET "{value_field}" = EXCLUDED."{value_field}""#, - )) - .bind(path) - .bind(value.to_vec()) - .execute(pool) - .await - .map_err(parse_postgres_error)?; + async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { + let p = build_abs_path(&self.root, path); - Ok(()) + if p == build_abs_path(&self.root, "") { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } else { + let bs = self.core.get(&p).await?; + match bs { + Some(bs) => Ok(RpStat::new( + Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64), + )), + None => Err(Error::new( + ErrorKind::NotFound, + "kv not found in postgresql", + )), + } + } + } + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let p = build_abs_path(&self.root, path); + let bs = match self.core.get(&p).await? { + Some(bs) => bs, + None => { + return Err(Error::new( + ErrorKind::NotFound, + "kv not found in postgresql", + )); + } + }; + Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize()))) } - async fn delete(&self, path: &str) -> Result<()> { - let pool = self.get_client().await?; + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let p = build_abs_path(&self.root, path); + Ok((RpWrite::new(), PostgresqlWriter::new(self.core.clone(), p))) + } - sqlx::query(&format!( - "DELETE FROM {} WHERE {} = $1", - self.table, self.key_field + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + Ok(( + RpDelete::default(), + oio::OneShotDeleter::new(PostgresqlDeleter::new(self.core.clone(), self.root.clone())), )) - .bind(path) - .execute(pool) - .await - .map_err(parse_postgres_error)?; - - Ok(()) } -} -fn parse_postgres_error(err: sqlx::Error) -> Error { - Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err) + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { + let _ = build_abs_path(&self.root, path); + Ok((RpList::default(), ())) + } } diff --git a/core/src/services/postgresql/config.rs b/core/src/services/postgresql/config.rs index 995947056..8d167ea34 100644 --- a/core/src/services/postgresql/config.rs +++ b/core/src/services/postgresql/config.rs @@ -18,10 +18,11 @@ use std::fmt::Debug; use std::fmt::Formatter; -use super::backend::PostgresqlBuilder; use serde::Deserialize; use serde::Serialize; +use super::backend::PostgresqlBuilder; + /// Config for PostgreSQL services support. #[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] @@ -68,6 +69,7 @@ impl Debug for PostgresqlConfig { impl crate::Configurator for PostgresqlConfig { type Builder = PostgresqlBuilder; + fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> { let mut map = uri.options().clone(); diff --git a/core/src/services/postgresql/core.rs b/core/src/services/postgresql/core.rs new file mode 100644 index 000000000..fb8396377 --- /dev/null +++ b/core/src/services/postgresql/core.rs @@ -0,0 +1,83 @@ +use sqlx::PgPool; +use sqlx::postgres::PgConnectOptions; +use tokio::sync::OnceCell; + +use crate::*; + +#[derive(Clone, Debug)] +pub struct PostgresqlCore { + pub pool: OnceCell<PgPool>, + pub config: PgConnectOptions, + + pub table: String, + pub key_field: String, + pub value_field: String, +} + +impl PostgresqlCore { + async fn get_client(&self) -> Result<&PgPool> { + self.pool + .get_or_try_init(|| async { + let pool = PgPool::connect_with(self.config.clone()) + .await + .map_err(parse_postgres_error)?; + Ok(pool) + }) + .await + } + + pub async fn get(&self, path: &str) -> Result<Option<Buffer>> { + let pool = self.get_client().await?; + + let value: Option<Vec<u8>> = sqlx::query_scalar(&format!( + r#"SELECT "{}" FROM "{}" WHERE "{}" = $1 LIMIT 1"#, + self.value_field, self.table, self.key_field + )) + .bind(path) + .fetch_optional(pool) + .await + .map_err(parse_postgres_error)?; + + Ok(value.map(Buffer::from)) + } + + pub async fn set(&self, path: &str, value: Buffer) -> Result<()> { + let pool = self.get_client().await?; + + let table = &self.table; + let key_field = &self.key_field; + let value_field = &self.value_field; + sqlx::query(&format!( + r#"INSERT INTO "{table}" ("{key_field}", "{value_field}") + VALUES ($1, $2) + ON CONFLICT ("{key_field}") + DO UPDATE SET "{value_field}" = EXCLUDED."{value_field}""#, + )) + .bind(path) + .bind(value.to_vec()) + .execute(pool) + .await + .map_err(parse_postgres_error)?; + + Ok(()) + } + + pub async fn delete(&self, path: &str) -> Result<()> { + let pool = self.get_client().await?; + + sqlx::query(&format!( + "DELETE FROM {} WHERE {} = $1", + self.table, self.key_field + )) + .bind(path) + .execute(pool) + .await + .map_err(parse_postgres_error)?; + + Ok(()) + } +} + +fn parse_postgres_error(err: sqlx::Error) -> Error { + Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(err) +} diff --git a/core/src/services/postgresql/mod.rs b/core/src/services/postgresql/deleter.rs similarity index 59% copy from core/src/services/postgresql/mod.rs copy to core/src/services/postgresql/deleter.rs index bba64b60c..68acf06ba 100644 --- a/core/src/services/postgresql/mod.rs +++ b/core/src/services/postgresql/deleter.rs @@ -15,8 +15,28 @@ // specific language governing permissions and limitations // under the License. -mod backend; -pub use backend::PostgresqlBuilder as Postgresql; +use std::sync::Arc; -mod config; -pub use config::PostgresqlConfig; +use super::core::*; +use crate::raw::oio; +use crate::raw::*; +use crate::*; + +pub struct PostgresqlDeleter { + core: Arc<PostgresqlCore>, + root: String, +} + +impl PostgresqlDeleter { + pub fn new(core: Arc<PostgresqlCore>, root: String) -> Self { + Self { core, root } + } +} + +impl oio::OneShotDelete for PostgresqlDeleter { + async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { + let p = build_abs_path(&self.root, &path); + self.core.delete(&p).await?; + Ok(()) + } +} diff --git a/core/src/services/postgresql/docs.md b/core/src/services/postgresql/docs.md index 0c21f2d55..b26c5bf40 100644 --- a/core/src/services/postgresql/docs.md +++ b/core/src/services/postgresql/docs.md @@ -2,16 +2,15 @@ This service can be used to: +- [ ] create_dir - [x] stat - [x] read - [x] write -- [x] create_dir - [x] delete - [ ] copy - [ ] rename -- [ ] ~~list~~ +- [ ] list - [ ] ~~presign~~ -- [ ] blocking ## Configuration diff --git a/core/src/services/postgresql/mod.rs b/core/src/services/postgresql/mod.rs index bba64b60c..525e607fc 100644 --- a/core/src/services/postgresql/mod.rs +++ b/core/src/services/postgresql/mod.rs @@ -16,6 +16,10 @@ // under the License. mod backend; +mod core; +mod deleter; +mod writer; + pub use backend::PostgresqlBuilder as Postgresql; mod config; diff --git a/core/src/services/postgresql/writer.rs b/core/src/services/postgresql/writer.rs new file mode 100644 index 000000000..38ced921c --- /dev/null +++ b/core/src/services/postgresql/writer.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use super::core::*; +use crate::raw::oio; +use crate::*; + +pub struct PostgresqlWriter { + core: Arc<PostgresqlCore>, + path: String, + buffer: oio::QueueBuf, +} + +impl PostgresqlWriter { + pub fn new(core: Arc<PostgresqlCore>, path: String) -> Self { + Self { + core, + path, + buffer: oio::QueueBuf::new(), + } + } +} + +impl oio::Write for PostgresqlWriter { + async fn write(&mut self, bs: Buffer) -> Result<()> { + self.buffer.push(bs); + Ok(()) + } + + async fn close(&mut self) -> Result<Metadata> { + let buf = self.buffer.clone().collect(); + let length = buf.len() as u64; + self.core.set(&self.path, buf).await?; + + let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); + Ok(meta) + } + + async fn abort(&mut self) -> Result<()> { + self.buffer.clear(); + Ok(()) + } +}
