This is an automated email from the ASF dual-hosted git repository.
junouyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 66f98cd8a feat: Add postgresql support for OpenDAL (#2815)
66f98cd8a is described below
commit 66f98cd8aa9d14572764b946e538f2047252f6d2
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 9 17:52:27 2023 +0800
feat: Add postgresql support for OpenDAL (#2815)
* Fix typo
Signed-off-by: Xuanwo <[email protected]>
* Save work
Signed-off-by: Xuanwo <[email protected]>
* Add test
Signed-off-by: Xuanwo <[email protected]>
* Add password
Signed-off-by: Xuanwo <[email protected]>
* fix build
Signed-off-by: Xuanwo <[email protected]>
* Add postgres
Signed-off-by: Xuanwo <[email protected]>
* FIx build
Signed-off-by: Xuanwo <[email protected]>
* Fix build
Signed-off-by: Xuanwo <[email protected]>
* Add diff check
Signed-off-by: Xuanwo <[email protected]>
* Fix
Signed-off-by: Xuanwo <[email protected]>
* try
Signed-off-by: Xuanwo <[email protected]>
* Fix conflict
Signed-off-by: Xuanwo <[email protected]>
* Format toml
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
.github/workflows/ci.yml | 4 +
.github/workflows/service_test_postgresql.yml | 85 ++++++
Cargo.lock | 94 ++++++
bin/oay/Cargo.toml | 17 +-
bin/oli/Cargo.toml | 2 +-
bindings/lua/Cargo.toml | 6 +-
bindings/ocaml/Cargo.toml | 5 +-
core/Cargo.toml | 7 +-
core/src/raw/adapters/kv/api.rs | 2 +-
core/src/raw/adapters/mod.rs | 3 +-
core/src/services/mod.rs | 5 +
core/src/services/postgresql/backend.rs | 333 +++++++++++++++++++++
core/src/services/postgresql/docs.md | 23 ++
.../{raw/adapters => services/postgresql}/mod.rs | 34 +--
core/src/services/tikv/fixtures/pd.toml | 2 +-
core/src/services/tikv/fixtures/tikv.toml | 8 +-
core/src/types/scheme.rs | 4 +
core/tests/behavior/main.rs | 2 +
18 files changed, 577 insertions(+), 59 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 001a3d7b3..5286397b9 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -102,12 +102,16 @@ jobs:
with:
distribution: temurin
java-version: "11"
+
- name: Setup OCaml toolchain
uses: ./.github/actions/setup-ocaml
- name: Cargo clippy
run: cargo clippy --all-targets --all-features --workspace -- -D
warnings
+ - name: Check diff
+ run: git diff --exit-code
+
check_msrv:
runs-on: ubuntu-latest
env:
diff --git a/.github/workflows/service_test_postgresql.yml
b/.github/workflows/service_test_postgresql.yml
new file mode 100644
index 000000000..2e3eebde9
--- /dev/null
+++ b/.github/workflows/service_test_postgresql.yml
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Service Test Postgresql
+
+on:
+ push:
+ branches:
+ - main
+ pull_request:
+ branches:
+ - main
+ paths:
+ - "core/src/**"
+ - "core/tests/**"
+ - "!core/src/docs/**"
+ - "!core/src/services/**"
+ - "core/src/services/postgresql/**"
+ - ".github/workflows/service_test_postgresql.yml"
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+ cancel-in-progress: true
+
+jobs:
+ postgresql:
+ runs-on: ubuntu-latest
+
+ services:
+ postgres:
+ image: postgres:13
+ env:
+ POSTGRES_USER: user
+ POSTGRES_PASSWORD: password
+ POSTGRES_DB: testdb
+ ports:
+ - 5432:5432
+ # needed because the postgres container does not provide a healthcheck
+ options: >-
+ --health-cmd pg_isready
+ --health-interval 10s
+ --health-timeout 5s
+ --health-retries 5
+
+ steps:
+ - uses: actions/checkout@v3
+ - name: Setup Rust toolchain
+ uses: ./.github/actions/setup
+ with:
+ need-nextest: true
+
+ - name: Setup PostgreSQL CLI
+ run: |
+ sudo apt -y install postgresql-client
+ psql -V
+
+ - name: Create table
+ run: |
+ export PGPASSWORD=password
+ psql -h localhost -U user -d testdb -c "CREATE TABLE data (key TEXT
PRIMARY KEY, value BYTEA);"
+
+ - name: Test
+ shell: bash
+ working-directory: core
+ run: cargo nextest run postgresql --features services-postgresql
+ env:
+ OPENDAL_POSTGRESQL_TEST: on
+ OPENDAL_POSTGRESQL_CONNECTION_STRING:
postgresql://user:password@localhost:5432/testdb
+ OPENDAL_POSTGRESQL_TABLE: data
+ OPENDAL_POSTGRESQL_KEY_FIELD: key
+ OPENDAL_POSTGRESQL_VALUE_FIELD: value
diff --git a/Cargo.lock b/Cargo.lock
index 2963f8ed2..2d5408af1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1638,6 +1638,12 @@ dependencies = [
"rand 0.7.3",
]
+[[package]]
+name = "fallible-iterator"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
+
[[package]]
name = "fastrand"
version = "1.9.0"
@@ -3479,6 +3485,7 @@ dependencies = [
"suppaftp",
"tikv-client",
"tokio",
+ "tokio-postgres",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
@@ -4060,6 +4067,24 @@ dependencies = [
"indexmap 1.9.3",
]
+[[package]]
+name = "phf"
+version = "0.11.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc"
+dependencies = [
+ "phf_shared",
+]
+
+[[package]]
+name = "phf_shared"
+version = "0.11.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b"
+dependencies = [
+ "siphasher",
+]
+
[[package]]
name = "pin-project"
version = "1.1.2"
@@ -4178,6 +4203,35 @@ version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "767eb9f07d4a5ebcb39bbf2d452058a93c011373abf6832e24194a1c3f004794"
+[[package]]
+name = "postgres-protocol"
+version = "0.6.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "78b7fa9f396f51dffd61546fd8573ee20592287996568e6175ceb0f8699ad75d"
+dependencies = [
+ "base64 0.21.2",
+ "byteorder",
+ "bytes",
+ "fallible-iterator",
+ "hmac",
+ "md-5",
+ "memchr",
+ "rand 0.8.5",
+ "sha2",
+ "stringprep",
+]
+
+[[package]]
+name = "postgres-types"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f028f05971fe20f512bcc679e2c10227e57809a3af86a7606304435bc8896cd6"
+dependencies = [
+ "bytes",
+ "fallible-iterator",
+ "postgres-protocol",
+]
+
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@@ -5316,6 +5370,12 @@ dependencies = [
"time 0.3.22",
]
+[[package]]
+name = "siphasher"
+version = "0.3.10"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
+
[[package]]
name = "size"
version = "0.4.1"
@@ -5484,6 +5544,16 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
+[[package]]
+name = "stringprep"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "db3737bde7edce97102e0e2b15365bf7a20bfdb5f60f4f9e8d7004258a51a8da"
+dependencies = [
+ "unicode-bidi",
+ "unicode-normalization",
+]
+
[[package]]
name = "strsim"
version = "0.10.0"
@@ -5826,6 +5896,30 @@ dependencies = [
"tokio",
]
+[[package]]
+name = "tokio-postgres"
+version = "0.7.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6e89f6234aa8fd43779746012fcf53603cdb91fdd8399aa0de868c2d56b6dde1"
+dependencies = [
+ "async-trait",
+ "byteorder",
+ "bytes",
+ "fallible-iterator",
+ "futures-channel",
+ "futures-util",
+ "log",
+ "parking_lot 0.12.1",
+ "percent-encoding",
+ "phf",
+ "pin-project-lite",
+ "postgres-protocol",
+ "postgres-types",
+ "socket2 0.5.3",
+ "tokio",
+ "tokio-util",
+]
+
[[package]]
name = "tokio-rustls"
version = "0.24.1"
diff --git a/bin/oay/Cargo.toml b/bin/oay/Cargo.toml
index 8556a0418..9bdf490a6 100644
--- a/bin/oay/Cargo.toml
+++ b/bin/oay/Cargo.toml
@@ -30,25 +30,21 @@ rust-version.workspace = true
version.workspace = true
[features]
-default = [
- "frontends-webdav",
- "frontends-s3"
-]
+default = ["frontends-webdav", "frontends-s3"]
-frontends-webdav = [
- "dep:dav-server",
- "dep:bytes",
- "dep:futures-util"
-]
frontends-s3 = []
+frontends-webdav = ["dep:dav-server", "dep:bytes", "dep:futures-util"]
[dependencies]
anyhow = "1"
axum = "0.6"
+bytes = { version = "1.4.0", optional = true }
chrono = "0.4.26"
clap = { version = "4", features = ["cargo", "string"] }
+dav-server = { version = "0.5.5", optional = true }
dirs = "5.0.0"
futures = "0.3"
+futures-util = { version = "0.3.16", optional = true }
opendal.workspace = true
quick-xml = { version = "0.29", features = ["serialize", "overlapped-lists"] }
serde = { version = "1", features = ["derive"] }
@@ -65,6 +61,3 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.3.1"
uuid = { version = "1", features = ["v4", "fast-rng"] }
-dav-server = { version = "0.5.5", optional = true }
-bytes = { version = "1.4.0", optional = true }
-futures-util = { version = "0.3.16", optional = true }
diff --git a/bin/oli/Cargo.toml b/bin/oli/Cargo.toml
index 42b5ed69c..3402aaa59 100644
--- a/bin/oli/Cargo.toml
+++ b/bin/oli/Cargo.toml
@@ -74,4 +74,4 @@ url = "2.3.1"
[dev-dependencies]
assert_cmd = "2"
predicates = "3"
-tempfile = "3.7.1"
\ No newline at end of file
+tempfile = "3.7.1"
diff --git a/bindings/lua/Cargo.toml b/bindings/lua/Cargo.toml
index fed7aa5d3..4661d4f7d 100644
--- a/bindings/lua/Cargo.toml
+++ b/bindings/lua/Cargo.toml
@@ -29,7 +29,6 @@ rust-version.workspace = true
# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
-
[features]
default = ["mlua/lua52"]
lua52 = ["mlua", "mlua/lua52"]
@@ -38,6 +37,7 @@ lua52 = ["mlua", "mlua/lua52"]
crate-type = ["cdylib"]
[dependencies]
-mlua = { version = "0.8", features = ["module"], default-features = false,
optional = true }
+mlua = { version = "0.8", features = [
+ "module",
+], default-features = false, optional = true }
opendal.workspace = true
-
diff --git a/bindings/ocaml/Cargo.toml b/bindings/ocaml/Cargo.toml
index a0dbd4b17..3dffc0471 100644
--- a/bindings/ocaml/Cargo.toml
+++ b/bindings/ocaml/Cargo.toml
@@ -32,9 +32,8 @@ crate-type = ["staticlib", "cdylib"]
doc = false
[dependencies]
+ocaml = { version = "^1.0.0-beta" }
opendal.workspace = true
-ocaml = {version = "^1.0.0-beta"}
[build-dependencies]
-ocaml-build = {version = "^1.0.0-beta"}
-
+ocaml-build = { version = "^1.0.0-beta" }
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 08e0253d9..09087772d 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -150,6 +150,7 @@ services-oss = [
"reqsign?/reqwest_request",
]
services-persy = ["dep:persy"]
+services-postgresql = ["dep:tokio-postgres"]
services-redb = ["dep:redb"]
services-redis = ["dep:redis"]
services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
@@ -200,7 +201,10 @@ cacache = { version = "11.6", default-features = false,
features = [
"tokio-runtime",
"mmap",
], optional = true }
-chrono = { version = "0.4.26", default-features = false, features = ["clock",
"std"] }
+chrono = { version = "0.4.26", default-features = false, features = [
+ "clock",
+ "std",
+] }
dashmap = { version = "5.4", optional = true }
dirs = { version = "5.0.1", optional = true }
etcd-client = { version = "0.11", optional = true, features = ["tls"] }
@@ -256,6 +260,7 @@ suppaftp = { version = "4.5", default-features = false,
features = [
], optional = true }
tikv-client = { version = "0.2.0", optional = true }
tokio = "1.27"
+tokio-postgres = { version = "0.7.8", optional = true }
tracing = { version = "0.1", optional = true }
uuid = { version = "1", features = ["serde", "v4"] }
diff --git a/core/src/raw/adapters/kv/api.rs b/core/src/raw/adapters/kv/api.rs
index dbe5f8f5f..0c1ad4f95 100644
--- a/core/src/raw/adapters/kv/api.rs
+++ b/core/src/raw/adapters/kv/api.rs
@@ -31,7 +31,7 @@ use crate::Scheme;
/// By implement this trait, any kv service can work as an OpenDAL Service.
#[async_trait]
pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
- /// Return the medata of this key value accessor.
+ /// Return the metadata of this key value accessor.
fn metadata(&self) -> Metadata;
/// Get a key from service.
diff --git a/core/src/raw/adapters/mod.rs b/core/src/raw/adapters/mod.rs
index 9864797c0..73dc67bf6 100644
--- a/core/src/raw/adapters/mod.rs
+++ b/core/src/raw/adapters/mod.rs
@@ -43,7 +43,8 @@
//!
//! # Available Adapters
//!
-//! - [`kv::Adapter`]: Adapter for Key Value Services like in-memory map,
`redis`.
+//! - [`kv::Adapter`]: Adapter for Key Value Services like `redis`.
+//! - [`typed_kv::Adapter`]: Adapter key key value services that in-memory.
pub mod kv;
pub mod typed_kv;
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 7545b71c5..e3236ef59 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -203,3 +203,8 @@ pub use self::tikv::Tikv;
mod foundationdb;
#[cfg(feature = "services-foundationdb")]
pub use self::foundationdb::Foundationdb;
+
+#[cfg(feature = "services-postgresql")]
+mod postgresql;
+#[cfg(feature = "services-postgresql")]
+pub use self::postgresql::Postgresql;
diff --git a/core/src/services/postgresql/backend.rs
b/core/src/services/postgresql/backend.rs
new file mode 100644
index 000000000..7600a5bde
--- /dev/null
+++ b/core/src/services/postgresql/backend.rs
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::adapters::kv;
+use crate::raw::*;
+use crate::*;
+use async_trait::async_trait;
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+use std::str::FromStr;
+use std::sync::Arc;
+use tokio::sync::OnceCell;
+use tokio_postgres::{Client, Config, Statement};
+
+/// [Postgresql](https://www.postgresql.org/) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct PostgresqlBuilder {
+ connection_string: Option<String>,
+
+ table: Option<String>,
+ key_field: Option<String>,
+ value_field: Option<String>,
+ root: Option<String>,
+}
+
+impl Debug for PostgresqlBuilder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut ds = f.debug_struct("Builder");
+ ds.field("table", &self.table);
+ ds.finish()
+ }
+}
+
+impl PostgresqlBuilder {
+ /// Set the connection_string of the postgresql service.
+ ///
+ /// This connection string is `libpq-style connection strings`. There are
two formats:
+ ///
+ /// ## Key Value
+ ///
+ /// This format consists of space-separated key-value pairs. Values which
are either the empty
+ /// string or contain whitespace should be wrapped in '. ' and \
characters should be
+ /// backslash-escaped.
+ ///
+ /// - `host=localhost user=postgres connect_timeout=10 keepalives=0`
+ /// - `host=/var/lib/postgresql,localhost port=1234 user=postgres
password='password with spaces'`
+ /// - `host=host1,host2,host3 port=1234,,5678 user=postgres
target_session_attrs=read-write`
+ ///
+ /// Available keys could found at:
<https://docs.rs/postgres/latest/postgres/config/struct.Config.html>
+ ///
+ /// ## Url
+ ///
+ /// This format resembles a URL with a scheme of either `postgres://` or
`postgresql://`.
+ ///
+ /// - `postgresql://user@localhost`
+ /// -
`postgresql://user:password@%2Fvar%2Flib%2Fpostgresql/mydb?connect_timeout=10`
+ /// -
`postgresql://user@host1:1234,host2,host3:5678?target_session_attrs=read-write`
+ /// - `postgresql:///mydb?user=user&host=/var/lib/postgresql`
+ ///
+ /// # Notes
+ ///
+ /// If connection_string has been specified, other parameters will be
ignored.
+ ///
+ /// For more information, please visit
<https://docs.rs/postgres/latest/postgres/config/struct.Config.html>
+ pub fn connection_string(&mut self, v: &str) -> &mut Self {
+ if !v.is_empty() {
+ self.connection_string = Some(v.to_string());
+ }
+ self
+ }
+
+ /// set the working directory, all operations will be performed under it.
+ ///
+ /// default: "/"
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ if !root.is_empty() {
+ self.root = Some(root.to_owned());
+ }
+ self
+ }
+
+ /// Set the table name of the postgresql service to read/write.
+ pub fn table(&mut self, table: &str) -> &mut Self {
+ if !table.is_empty() {
+ self.table = Some(table.to_string());
+ }
+ self
+ }
+
+ /// Set the key field name of the postgresql service to read/write.
+ ///
+ /// Default to `key` if not specified.
+ pub fn key_field(&mut self, key_field: &str) -> &mut Self {
+ if !key_field.is_empty() {
+ self.key_field = Some(key_field.to_string());
+ }
+ self
+ }
+
+ /// Set the value field name of the postgresql service to read/write.
+ ///
+ /// Default to `value` if not specified.
+ pub fn value_field(&mut self, value_field: &str) -> &mut Self {
+ if !value_field.is_empty() {
+ self.value_field = Some(value_field.to_string());
+ }
+ self
+ }
+}
+
+impl Builder for PostgresqlBuilder {
+ const SCHEME: Scheme = Scheme::Postgresql;
+ type Accessor = PostgresqlBackend;
+
+ fn from_map(map: HashMap<String, String>) -> Self {
+ let mut builder = PostgresqlBuilder::default();
+
+ map.get("connection_string")
+ .map(|v| builder.connection_string(v));
+ map.get("table").map(|v| builder.table(v));
+ map.get("key_field").map(|v| builder.key_field(v));
+ map.get("value_field").map(|v| builder.value_field(v));
+ map.get("root").map(|v| builder.root(v));
+
+ builder
+ }
+
+ fn build(&mut self) -> Result<Self::Accessor> {
+ let conn = match self.connection_string.clone() {
+ Some(v) => v,
+ None => {
+ return Err(
+ Error::new(ErrorKind::ConfigInvalid, "connection_string is
empty")
+ .with_context("service", Scheme::Postgresql),
+ )
+ }
+ };
+
+ let config = Config::from_str(&conn).map_err(|err| {
+ Error::new(ErrorKind::ConfigInvalid, "connection_string is
invalid")
+ .with_context("service", Scheme::Postgresql)
+ .set_source(err)
+ })?;
+
+ let table = match self.table.clone() {
+ Some(v) => v,
+ None => {
+ return Err(Error::new(ErrorKind::ConfigInvalid, "table is
empty")
+ .with_context("service", Scheme::Postgresql))
+ }
+ };
+ let key_field = match self.key_field.clone() {
+ Some(v) => v,
+ None => "key".to_string(),
+ };
+ let value_field = match self.value_field.clone() {
+ Some(v) => v,
+ None => "value".to_string(),
+ };
+ let root = normalize_root(
+ self.root
+ .clone()
+ .unwrap_or_else(|| "/".to_string())
+ .as_str(),
+ );
+
+ Ok(PostgresqlBackend::new(Adapter {
+ client: OnceCell::new(),
+ config,
+ table,
+ key_field,
+ value_field,
+
+ statement_get: OnceCell::new(),
+ statement_set: OnceCell::new(),
+ statement_del: OnceCell::new(),
+ })
+ .with_root(&root))
+ }
+}
+
+/// Backend for Postgresql service
+pub type PostgresqlBackend = kv::Backend<Adapter>;
+
+#[derive(Clone)]
+pub struct Adapter {
+ client: OnceCell<Arc<Client>>,
+ config: Config,
+
+ table: String,
+ key_field: String,
+ value_field: String,
+
+ /// Prepared statements for get/put/delete.
+ statement_get: OnceCell<Statement>,
+ statement_set: OnceCell<Statement>,
+ statement_del: OnceCell<Statement>,
+}
+
+impl Debug for Adapter {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut ds = f.debug_struct("Adapter");
+ ds.finish_non_exhaustive()
+ }
+}
+
+impl Adapter {
+ async fn get_client(&self) -> Result<&Client> {
+ self.client
+ .get_or_try_init(|| async {
+ // TODO: add tls support.
+ let (client, conn) =
self.config.connect(tokio_postgres::NoTls).await?;
+
+ // The connection object performs the actual communication
with the database,
+ // so spawn it off to run on its own.
+ tokio::spawn(async move {
+ if let Err(e) = conn.await {
+ eprintln!("postgresql connection error: {}", e);
+ }
+ });
+
+ Ok(Arc::new(client))
+ })
+ .await
+ .map(|v| v.as_ref())
+ }
+}
+
+#[async_trait]
+impl kv::Adapter for Adapter {
+ fn metadata(&self) -> kv::Metadata {
+ kv::Metadata::new(
+ Scheme::Postgresql,
+ &self.table,
+ Capability {
+ read: true,
+ write: true,
+ ..Default::default()
+ },
+ )
+ }
+
+ async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
+ let query = format!(
+ "SELECT {} FROM {} WHERE {} = $1 LIMIT 1",
+ self.value_field, self.table, self.key_field
+ );
+ let statement = self
+ .statement_get
+ .get_or_try_init(|| async {
+ self.get_client()
+ .await?
+ .prepare(&query)
+ .await
+ .map_err(Error::from)
+ })
+ .await?;
+
+ let rows = self.get_client().await?.query(statement, &[&path]).await?;
+ if rows.is_empty() {
+ return Ok(None);
+ }
+ let value: Vec<u8> = rows[0].get(0);
+ Ok(Some(value))
+ }
+
+ async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
+ let table = &self.table;
+ let key_field = &self.key_field;
+ let value_field = &self.value_field;
+ let query = format!(
+ "INSERT INTO {table} ({key_field}, {value_field}) \
+ VALUES ($1, $2) \
+ ON CONFLICT ({key_field}) \
+ DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
+ );
+ let statement = self
+ .statement_set
+ .get_or_try_init(|| async {
+ self.get_client()
+ .await?
+ .prepare(&query)
+ .await
+ .map_err(Error::from)
+ })
+ .await?;
+
+ let _ = self
+ .get_client()
+ .await?
+ .query(statement, &[&path, &value])
+ .await?;
+ Ok(())
+ }
+
+ async fn delete(&self, path: &str) -> Result<()> {
+ let query = format!("DELETE FROM {} WHERE {} = $1", self.table,
self.key_field);
+ let statement = self
+ .statement_del
+ .get_or_try_init(|| async {
+ self.get_client()
+ .await?
+ .prepare(&query)
+ .await
+ .map_err(Error::from)
+ })
+ .await?;
+
+ let _ = self.get_client().await?.query(statement, &[&path]).await?;
+ Ok(())
+ }
+}
+
+impl From<tokio_postgres::Error> for Error {
+ fn from(value: tokio_postgres::Error) -> Error {
+ Error::new(ErrorKind::Unexpected, "unhandled error from
postgresql").set_source(value)
+ }
+}
diff --git a/core/src/services/postgresql/docs.md
b/core/src/services/postgresql/docs.md
new file mode 100644
index 000000000..35ac3fff2
--- /dev/null
+++ b/core/src/services/postgresql/docs.md
@@ -0,0 +1,23 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [ ] copy
+- [ ] rename
+- [ ] ~~list~~
+- [ ] scan
+- [ ] ~~presign~~
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the working directory of `OpenDAL`
+- `connection_string`: Set the connection string of postgres server
+- `table`: Set the table of postgresql
+- `key_field`: Set the key field of postgresql
+- `value_field`: Set the value field of postgresql
diff --git a/core/src/raw/adapters/mod.rs b/core/src/services/postgresql/mod.rs
similarity index 51%
copy from core/src/raw/adapters/mod.rs
copy to core/src/services/postgresql/mod.rs
index 9864797c0..427152a7e 100644
--- a/core/src/raw/adapters/mod.rs
+++ b/core/src/services/postgresql/mod.rs
@@ -15,35 +15,5 @@
// specific language governing permissions and limitations
// under the License.
-//! Providing adapters and its implementations.
-//!
-//! Adapters in OpenDAL means services that shares similar behaviors. We use
-//! adapter to make those services been implemented more easily. For example,
-//! with [`kv::Adapter`], users only need to implement `get`, `set` for a
service.
-//!
-//! # Notes
-//!
-//! Please import the module instead of its type.
-//!
-//! For example, use the following:
-//!
-//! ```ignore
-//! use opendal::adapters::kv;
-//!
-//! impl kv::Adapter for MyType {}
-//! ```
-//!
-//! Instead of:
-//!
-//! ```ignore
-//! use opendal::adapters::kv::Adapter;
-//!
-//! impl Adapter for MyType {}
-//! ```
-//!
-//! # Available Adapters
-//!
-//! - [`kv::Adapter`]: Adapter for Key Value Services like in-memory map,
`redis`.
-
-pub mod kv;
-pub mod typed_kv;
+mod backend;
+pub use backend::PostgresqlBuilder as Postgresql;
diff --git a/core/src/services/tikv/fixtures/pd.toml
b/core/src/services/tikv/fixtures/pd.toml
index 39393f698..613d1bf8e 100644
--- a/core/src/services/tikv/fixtures/pd.toml
+++ b/core/src/services/tikv/fixtures/pd.toml
@@ -16,5 +16,5 @@
# under the License.
[schedule]
+max-merge-region-keys = 3
max-merge-region-size = 1
-max-merge-region-keys = 3
\ No newline at end of file
diff --git a/core/src/services/tikv/fixtures/tikv.toml
b/core/src/services/tikv/fixtures/tikv.toml
index 2fc4204a5..62e194bf4 100644
--- a/core/src/services/tikv/fixtures/tikv.toml
+++ b/core/src/services/tikv/fixtures/tikv.toml
@@ -16,19 +16,19 @@
# under the License.
[coprocessor]
+batch-split-limit = 100
region-max-keys = 10
region-split-keys = 7
-batch-split-limit = 100
[raftstore]
-region-split-check-diff = "1B"
pd-heartbeat-tick-interval = "2s"
pd-store-heartbeat-tick-interval = "5s"
-split-region-check-tick-interval = "1s"
raft-entry-max-size = "10MB"
+region-split-check-diff = "1B"
+split-region-check-tick-interval = "1s"
[rocksdb]
max-open-files = 10000
[raftdb]
-max-open-files = 10000
\ No newline at end of file
+max-open-files = 10000
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 4eabb5e25..4fb0303e8 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -84,6 +84,8 @@ pub enum Scheme {
Persy,
/// [redis][crate::services::Redis]: Redis services
Redis,
+ /// [postgresql][crate::services::Postgresql]: Postgresql services
+ Postgresql,
/// [rocksdb][crate::services::Rocksdb]: RocksDB services
Rocksdb,
/// [s3][crate::services::S3]: AWS S3 alike services.
@@ -163,6 +165,7 @@ impl FromStr for Scheme {
"obs" => Ok(Scheme::Obs),
"onedrive" => Ok(Scheme::Onedrive),
"persy" => Ok(Scheme::Persy),
+ "postgresql" => Ok(Scheme::Postgresql),
"redb" => Ok(Scheme::Redb),
"redis" => Ok(Scheme::Redis),
"rocksdb" => Ok(Scheme::Rocksdb),
@@ -206,6 +209,7 @@ impl From<Scheme> for &'static str {
Scheme::Obs => "obs",
Scheme::Onedrive => "onedrive",
Scheme::Persy => "persy",
+ Scheme::Postgresql => "postgresql",
Scheme::Gdrive => "gdrive",
Scheme::Dropbox => "dropbox",
Scheme::Redis => "redis",
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index ca8b2e61f..facbd34a0 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -139,6 +139,8 @@ fn main() -> anyhow::Result<()> {
tests.extend(behavior_test::<services::Obs>());
#[cfg(feature = "services-onedrive")]
tests.extend(behavior_test::<services::Onedrive>());
+ #[cfg(feature = "services-postgresql")]
+ tests.extend(behavior_test::<services::Postgresql>());
#[cfg(feature = "services-gdrive")]
tests.extend(behavior_test::<services::Gdrive>());
#[cfg(feature = "services-dropbox")]