This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch add-pg-support in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit e9f013b68e4f6135c2655ff3af074755fcf62729 Author: Xuanwo <[email protected]> AuthorDate: Tue Aug 8 22:08:03 2023 +0800 Save work Signed-off-by: Xuanwo <[email protected]> --- Cargo.lock | 95 +++++++++ core/Cargo.toml | 81 ++------ core/src/services/mod.rs | 3 + core/src/services/postgresql/backend.rs | 328 ++++++++++++++++++++++++++++++++ core/src/services/postgresql/docs.md | 23 +++ core/src/services/postgresql/mod.rs | 19 ++ core/src/types/scheme.rs | 4 + 7 files changed, 485 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 36cdbc7d7..66d595c5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1566,6 +1566,12 @@ dependencies = [ "rand 0.7.3", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fastrand" version = "1.9.0" @@ -3341,6 +3347,7 @@ dependencies = [ "futures", "governor", "hdrs", + "hex", "http", "hyper", "lazy-regex", @@ -3381,6 +3388,7 @@ dependencies = [ "suppaftp", "tikv-client", "tokio", + "tokio-postgres", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -3931,6 +3939,24 @@ dependencies = [ "indexmap 1.9.3", ] +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.1.2" @@ -4049,6 +4075,35 @@ version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "767eb9f07d4a5ebcb39bbf2d452058a93c011373abf6832e24194a1c3f004794" +[[package]] +name = "postgres-protocol" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b7fa9f396f51dffd61546fd8573ee20592287996568e6175ceb0f8699ad75d" +dependencies = [ + "base64 0.21.2", + "byteorder", + "bytes", + "fallible-iterator", + "hmac", + "md-5", + "memchr", + "rand 0.8.5", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f028f05971fe20f512bcc679e2c10227e57809a3af86a7606304435bc8896cd6" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -5187,6 +5242,12 @@ dependencies = [ "time 0.3.22", ] +[[package]] +name = "siphasher" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" + [[package]] name = "size" version = "0.4.1" @@ -5355,6 +5416,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stringprep" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db3737bde7edce97102e0e2b15365bf7a20bfdb5f60f4f9e8d7004258a51a8da" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "strsim" version = "0.10.0" @@ -5698,6 +5769,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-postgres" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e89f6234aa8fd43779746012fcf53603cdb91fdd8399aa0de868c2d56b6dde1" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures-channel", + "futures-util", + "log", + "parking_lot 0.12.1", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "socket2 0.5.3", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/core/Cargo.toml b/core/Cargo.toml index 08e0253d9..15cdbd588 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -34,23 +34,7 @@ version.workspace = true all-features = true [features] -default = [ - "rustls", - "services-azblob", - "services-azdfs", - "services-cos", - "services-fs", - "services-gcs", - "services-ghac", - "services-http", - "services-ipmfs", - "services-memory", - "services-obs", - "services-oss", - "services-s3", - "services-webdav", - "services-webhdfs", -] +default = ["rustls", "services-azblob", "services-azdfs", "services-cos", "services-fs", "services-gcs", "services-ghac", "services-http", "services-ipmfs", "services-memory", "services-obs", "services-oss", "services-s3", "services-webdav", "services-webhdfs"] # Build docs or not. # @@ -69,16 +53,7 @@ native-tls = ["reqwest/native-tls"] native-tls-vendored = ["reqwest/native-tls-vendored"] # Enable all layers. -layers-all = [ - "layers-chaos", - "layers-metrics", - "layers-prometheus", - "layers-tracing", - "layers-minitrace", - "layers-throttle", - "layers-await-tree", - "layers-async-backtrace", -] +layers-all = ["layers-chaos", "layers-metrics", "layers-prometheus", "layers-tracing", "layers-minitrace", "layers-throttle", "layers-await-tree", "layers-async-backtrace"] # Enable layers chaos support layers-chaos = ["dep:rand"] # Enable layers metrics support @@ -100,34 +75,17 @@ layers-await-tree = ["dep:await-tree"] # Enable layers async-backtrace support. layers-async-backtrace = ["dep:async-backtrace"] -services-azblob = [ - "dep:sha2", - "dep:reqsign", - "reqsign?/services-azblob", - "reqsign?/reqwest_request", -] -services-azdfs = [ - "dep:reqsign", - "reqsign?/services-azblob", - "reqsign?/reqwest_request", -] +services-azblob = ["dep:sha2", "dep:reqsign", "reqsign?/services-azblob", "reqsign?/reqwest_request"] +services-azdfs = ["dep:reqsign", "reqsign?/services-azblob", "reqsign?/reqwest_request"] services-cacache = ["dep:cacache"] -services-cos = [ - "dep:reqsign", - "reqsign?/services-tencent", - "reqsign?/reqwest_request", -] +services-cos = ["dep:reqsign", "reqsign?/services-tencent", "reqsign?/reqwest_request"] services-dashmap = ["dep:dashmap"] services-dropbox = [] services-etcd = ["dep:etcd-client"] services-foundationdb = ["dep:foundationdb"] services-fs = ["tokio/fs"] services-ftp = ["dep:suppaftp", "dep:lazy-regex", "dep:bb8", "dep:async-tls"] -services-gcs = [ - "dep:reqsign", - "reqsign?/services-google", - "reqsign?/reqwest_request", -] +services-gcs = ["dep:reqsign", "reqsign?/services-google", "reqsign?/reqwest_request"] services-gdrive = [] services-ghac = [] services-hdfs = ["dep:hdrs"] @@ -138,38 +96,23 @@ services-memcached = ["dep:bb8"] services-memory = [] services-mini-moka = ["dep:mini-moka"] services-moka = ["dep:moka"] -services-obs = [ - "dep:reqsign", - "reqsign?/services-huaweicloud", - "reqsign?/reqwest_request", -] +services-obs = ["dep:reqsign", "reqsign?/services-huaweicloud", "reqsign?/reqwest_request"] services-onedrive = [] -services-oss = [ - "dep:reqsign", - "reqsign?/services-aliyun", - "reqsign?/reqwest_request", -] +services-postgresql = ["dep:tokio-postgres", "dep:hex"] +services-oss = ["dep:reqsign", "reqsign?/services-aliyun", "reqsign?/reqwest_request"] services-persy = ["dep:persy"] services-redb = ["dep:redb"] services-redis = ["dep:redis"] services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"] services-redis-rustls = ["services-redis", "redis?/tokio-rustls-comp"] services-rocksdb = ["dep:rocksdb"] -services-s3 = [ - "dep:reqsign", - "reqsign?/services-aws", - "reqsign?/reqwest_request", -] +services-s3 = ["dep:reqsign", "reqsign?/services-aws", "reqsign?/reqwest_request"] services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:dirs"] services-sled = ["dep:sled"] services-supabase = [] services-tikv = ["tikv-client"] services-vercel-artifacts = [] -services-wasabi = [ - "dep:reqsign", - "reqsign?/services-aws", - "reqsign?/reqwest_request", -] +services-wasabi = ["dep:reqsign", "reqsign?/services-aws", "reqsign?/reqwest_request"] services-webdav = [] services-webhdfs = [] @@ -211,6 +154,7 @@ foundationdb = { version = "0.8.0", features = [ futures = { version = "0.3", default-features = false, features = ["std"] } governor = { version = "0.5", optional = true, features = ["std"] } hdrs = { version = "0.3.0", optional = true, features = ["async_file"] } +hex = { version = "0.4.3", optional = true } http = "0.2.9" hyper = "0.14" lazy-regex = { version = "2.5.0", optional = true } @@ -256,6 +200,7 @@ suppaftp = { version = "4.5", default-features = false, features = [ ], optional = true } tikv-client = { version = "0.2.0", optional = true } tokio = "1.27" +tokio-postgres = { version = "0.7.8", optional = true } tracing = { version = "0.1", optional = true } uuid = { version = "1", features = ["serde", "v4"] } diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs index 7545b71c5..656738f0f 100644 --- a/core/src/services/mod.rs +++ b/core/src/services/mod.rs @@ -203,3 +203,6 @@ pub use self::tikv::Tikv; mod foundationdb; #[cfg(feature = "services-foundationdb")] pub use self::foundationdb::Foundationdb; + +#[cfg(feature = "services-postgresql")] +mod postgresql; diff --git a/core/src/services/postgresql/backend.rs b/core/src/services/postgresql/backend.rs new file mode 100644 index 000000000..7609532ac --- /dev/null +++ b/core/src/services/postgresql/backend.rs @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::adapters::kv; +use crate::raw::*; +use crate::*; +use async_trait::async_trait; +use std::collections::HashMap; +use std::fmt::{Debug, Formatter}; +use std::str::FromStr; +use std::sync::Arc; +use tokio::sync::OnceCell; +use tokio_postgres::{Client, Config, Statement}; + +/// [Postgresql](https://www.postgresql.org/) services support. +#[doc = include_str!("docs.md")] +#[derive(Default)] +pub struct PostgresqlBuilder { + connection_string: Option<String>, + + table: Option<String>, + key_field: Option<String>, + value_field: Option<String>, + root: Option<String>, +} + +impl Debug for PostgresqlBuilder { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Builder"); + ds.field("table", &self.table); + ds.finish() + } +} + +impl PostgresqlBuilder { + /// Set the connection_string of the postgresql service. + /// + /// This connection string is `libpq-style connection strings`. There are two formats: + /// + /// ## Key Value + /// + /// This format consists of space-separated key-value pairs. Values which are either the empty + /// string or contain whitespace should be wrapped in '. ' and \ characters should be + /// backslash-escaped. + /// + /// - `host=localhost user=postgres connect_timeout=10 keepalives=0` + /// - `host=/var/lib/postgresql,localhost port=1234 user=postgres password='password with spaces'` + /// - `host=host1,host2,host3 port=1234,,5678 user=postgres target_session_attrs=read-write` + /// + /// Available keys could found at: <https://docs.rs/postgres/latest/postgres/config/struct.Config.html> + /// + /// ## Url + /// + /// This format resembles a URL with a scheme of either `postgres://` or `postgresql://`. + /// + /// - `postgresql://user@localhost` + /// - `postgresql://user:password@%2Fvar%2Flib%2Fpostgresql/mydb?connect_timeout=10` + /// - `postgresql://user@host1:1234,host2,host3:5678?target_session_attrs=read-write` + /// - `postgresql:///mydb?user=user&host=/var/lib/postgresql` + /// + /// # Notes + /// + /// If connection_string has been specified, other parameters will be ignored. + /// + /// For more information, please visit <https://docs.rs/postgres/latest/postgres/config/struct.Config.html> + pub fn connection_string(&mut self, v: &str) -> &mut Self { + if !v.is_empty() { + self.connection_string = Some(v.to_string()); + } + self + } + + /// set the working directory, all operations will be performed under it. + /// + /// default: "/" + pub fn root(&mut self, root: &str) -> &mut Self { + if !root.is_empty() { + self.root = Some(root.to_owned()); + } + self + } + + /// Set the table name of the postgresql service to read/write. + pub fn table(&mut self, table: &str) -> &mut Self { + if !table.is_empty() { + self.table = Some(table.to_string()); + } + self + } + + /// Set the key field name of the postgresql service to read/write. + /// + /// Default to `key` if not specified. + pub fn key_field(&mut self, key_field: &str) -> &mut Self { + if !key_field.is_empty() { + self.key_field = Some(key_field.to_string()); + } + self + } + + /// Set the value field name of the postgresql service to read/write. + /// + /// Default to `value` if not specified. + pub fn value_field(&mut self, value_field: &str) -> &mut Self { + if !value_field.is_empty() { + self.value_field = Some(value_field.to_string()); + } + self + } +} + +impl Builder for PostgresqlBuilder { + const SCHEME: Scheme = Scheme::Postgresql; + type Accessor = PostgresqlBackend; + + fn from_map(map: HashMap<String, String>) -> Self { + let mut builder = PostgresqlBuilder::default(); + + map.get("connection_string") + .map(|v| builder.connection_string(v)); + map.get("table").map(|v| builder.table(v)); + map.get("key_field").map(|v| builder.key_field(v)); + map.get("value_field").map(|v| builder.value_field(v)); + map.get("root").map(|v| builder.root(v)); + + builder + } + + fn build(&mut self) -> Result<Self::Accessor> { + let conn = match self.connection_string.clone() { + Some(v) => v, + None => { + return Err( + Error::new(ErrorKind::ConfigInvalid, "connection_string is empty") + .with_context("service", Scheme::Postgresql), + ) + } + }; + + let config = Config::from_str(&conn).map_err(|err| { + Error::new(ErrorKind::ConfigInvalid, "connection_string is invalid") + .with_context("service", Scheme::Postgresql) + .set_source(err) + })?; + + let table = match self.table.clone() { + Some(v) => v, + None => { + return Err(Error::new(ErrorKind::ConfigInvalid, "table is empty") + .with_context("service", Scheme::Postgresql)) + } + }; + let key_field = match self.key_field.clone() { + Some(v) => v, + None => "key".to_string(), + }; + let value_field = match self.value_field.clone() { + Some(v) => v, + None => "value".to_string(), + }; + let root = normalize_root( + self.root + .clone() + .unwrap_or_else(|| "/".to_string()) + .as_str(), + ); + + Ok(PostgresqlBackend::new(Adapter { + client: OnceCell::new(), + config, + table, + key_field, + value_field, + + statement_get: OnceCell::new(), + statement_set: OnceCell::new(), + statement_del: OnceCell::new(), + }) + .with_root(&root)) + } +} + +/// Backend for Postgresql service +pub type PostgresqlBackend = kv::Backend<Adapter>; + +#[derive(Clone)] +pub struct Adapter { + client: OnceCell<Arc<Client>>, + config: Config, + + table: String, + key_field: String, + value_field: String, + + /// Prepared statements for get/put/delete. + statement_get: OnceCell<Statement>, + statement_set: OnceCell<Statement>, + statement_del: OnceCell<Statement>, +} + +impl Debug for Adapter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Adapter"); + ds.finish_non_exhaustive() + } +} + +impl Adapter { + async fn get_client(&self) -> Result<&Client> { + self.client + .get_or_try_init(|| async { + // TODO: add tls support. + let (client, conn) = self.config.connect(tokio_postgres::NoTls).await?; + + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("postgresql connection error: {}", e); + } + }); + + Ok(Arc::new(client)) + }) + .await + .map(|v| v.as_ref()) + } +} + +#[async_trait] +impl kv::Adapter for Adapter { + fn metadata(&self) -> kv::Metadata { + kv::Metadata::new( + Scheme::Postgresql, + &self.table, + Capability { + read: true, + write: true, + ..Default::default() + }, + ) + } + + async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> { + let query = format!( + "SELECT {} FROM {} WHERE {} = $1 LIMIT 1", + self.value_field, self.table, self.key_field + ); + let statement = self + .statement_get + .get_or_try_init(|| async { + self.get_client() + .await? + .prepare(&query) + .await + .map_err(Error::from) + }) + .await?; + + let rows = self.get_client().await?.query(statement, &[&path]).await?; + if rows.is_empty() { + return Ok(None); + } + let value: Vec<u8> = rows[0].get(0); + Ok(Some(value)) + } + + async fn set(&self, path: &str, value: &[u8]) -> Result<()> { + let query = format!( + "INSERT INTO {} ({}, {}) VALUES ('{}', '\\x$1')", + self.table, self.key_field, self.value_field, path, + ); + let statement = self + .statement_set + .get_or_try_init(|| async { + self.get_client() + .await? + .prepare(&query) + .await + .map_err(Error::from) + }) + .await?; + + let _ = self + .get_client() + .await? + .query(statement, &[&hex::encode(value)]) + .await?; + Ok(()) + } + + async fn delete(&self, path: &str) -> Result<()> { + let query = format!("DELETE FROM {} WHERE {} = $1", self.table, self.key_field); + let statement = self + .statement_del + .get_or_try_init(|| async { + self.get_client() + .await? + .prepare(&query) + .await + .map_err(Error::from) + }) + .await?; + + let _ = self.get_client().await?.query(statement, &[&path]).await?; + Ok(()) + } +} + +impl From<tokio_postgres::Error> for Error { + fn from(value: tokio_postgres::Error) -> Error { + Error::new(ErrorKind::Unexpected, "unhandled error from postgresql").set_source(value) + } +} diff --git a/core/src/services/postgresql/docs.md b/core/src/services/postgresql/docs.md new file mode 100644 index 000000000..35ac3fff2 --- /dev/null +++ b/core/src/services/postgresql/docs.md @@ -0,0 +1,23 @@ +## Capabilities + +This service can be used to: + +- [x] stat +- [x] read +- [x] write +- [x] create_dir +- [x] delete +- [ ] copy +- [ ] rename +- [ ] ~~list~~ +- [ ] scan +- [ ] ~~presign~~ +- [ ] blocking + +## Configuration + +- `root`: Set the working directory of `OpenDAL` +- `connection_string`: Set the connection string of postgres server +- `table`: Set the table of postgresql +- `key_field`: Set the key field of postgresql +- `value_field`: Set the value field of postgresql diff --git a/core/src/services/postgresql/mod.rs b/core/src/services/postgresql/mod.rs new file mode 100644 index 000000000..427152a7e --- /dev/null +++ b/core/src/services/postgresql/mod.rs @@ -0,0 +1,19 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod backend; +pub use backend::PostgresqlBuilder as Postgresql; diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs index 4eabb5e25..4fb0303e8 100644 --- a/core/src/types/scheme.rs +++ b/core/src/types/scheme.rs @@ -84,6 +84,8 @@ pub enum Scheme { Persy, /// [redis][crate::services::Redis]: Redis services Redis, + /// [postgresql][crate::services::Postgresql]: Postgresql services + Postgresql, /// [rocksdb][crate::services::Rocksdb]: RocksDB services Rocksdb, /// [s3][crate::services::S3]: AWS S3 alike services. @@ -163,6 +165,7 @@ impl FromStr for Scheme { "obs" => Ok(Scheme::Obs), "onedrive" => Ok(Scheme::Onedrive), "persy" => Ok(Scheme::Persy), + "postgresql" => Ok(Scheme::Postgresql), "redb" => Ok(Scheme::Redb), "redis" => Ok(Scheme::Redis), "rocksdb" => Ok(Scheme::Rocksdb), @@ -206,6 +209,7 @@ impl From<Scheme> for &'static str { Scheme::Obs => "obs", Scheme::Onedrive => "onedrive", Scheme::Persy => "persy", + Scheme::Postgresql => "postgresql", Scheme::Gdrive => "gdrive", Scheme::Dropbox => "dropbox", Scheme::Redis => "redis",
