This is an automated email from the ASF dual-hosted git repository.

junouyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 66f98cd8a feat: Add postgresql support for OpenDAL (#2815)
66f98cd8a is described below

commit 66f98cd8aa9d14572764b946e538f2047252f6d2
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 9 17:52:27 2023 +0800

    feat: Add postgresql support for OpenDAL (#2815)
    
    * Fix typo
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Save work
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add test
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add password
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add postgres
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * FIx build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix build
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Add diff check
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * try
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix conflict
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Format toml
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 .github/workflows/ci.yml                           |   4 +
 .github/workflows/service_test_postgresql.yml      |  85 ++++++
 Cargo.lock                                         |  94 ++++++
 bin/oay/Cargo.toml                                 |  17 +-
 bin/oli/Cargo.toml                                 |   2 +-
 bindings/lua/Cargo.toml                            |   6 +-
 bindings/ocaml/Cargo.toml                          |   5 +-
 core/Cargo.toml                                    |   7 +-
 core/src/raw/adapters/kv/api.rs                    |   2 +-
 core/src/raw/adapters/mod.rs                       |   3 +-
 core/src/services/mod.rs                           |   5 +
 core/src/services/postgresql/backend.rs            | 333 +++++++++++++++++++++
 core/src/services/postgresql/docs.md               |  23 ++
 .../{raw/adapters => services/postgresql}/mod.rs   |  34 +--
 core/src/services/tikv/fixtures/pd.toml            |   2 +-
 core/src/services/tikv/fixtures/tikv.toml          |   8 +-
 core/src/types/scheme.rs                           |   4 +
 core/tests/behavior/main.rs                        |   2 +
 18 files changed, 577 insertions(+), 59 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 001a3d7b3..5286397b9 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -102,12 +102,16 @@ jobs:
         with:
           distribution: temurin
           java-version: "11"
+
       - name: Setup OCaml toolchain
         uses: ./.github/actions/setup-ocaml
 
       - name: Cargo clippy
         run: cargo clippy --all-targets --all-features --workspace -- -D 
warnings
 
+      - name: Check diff
+        run: git diff --exit-code
+
   check_msrv:
     runs-on: ubuntu-latest
     env:
diff --git a/.github/workflows/service_test_postgresql.yml 
b/.github/workflows/service_test_postgresql.yml
new file mode 100644
index 000000000..2e3eebde9
--- /dev/null
+++ b/.github/workflows/service_test_postgresql.yml
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: Service Test Postgresql
+
+on:
+  push:
+    branches:
+      - main
+  pull_request:
+    branches:
+      - main
+    paths:
+      - "core/src/**"
+      - "core/tests/**"
+      - "!core/src/docs/**"
+      - "!core/src/services/**"
+      - "core/src/services/postgresql/**"
+      - ".github/workflows/service_test_postgresql.yml"
+
+concurrency:
+  group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event_name }}
+  cancel-in-progress: true
+
+jobs:
+  postgresql:
+    runs-on: ubuntu-latest
+
+    services:
+      postgres:
+        image: postgres:13
+        env:
+          POSTGRES_USER: user
+          POSTGRES_PASSWORD: password
+          POSTGRES_DB: testdb
+        ports:
+          - 5432:5432
+        # needed because the postgres container does not provide a healthcheck
+        options: >-
+          --health-cmd pg_isready
+          --health-interval 10s
+          --health-timeout 5s
+          --health-retries 5
+
+    steps:
+      - uses: actions/checkout@v3
+      - name: Setup Rust toolchain
+        uses: ./.github/actions/setup
+        with:
+          need-nextest: true
+
+      - name: Setup PostgreSQL CLI
+        run: |
+          sudo apt -y install postgresql-client
+          psql -V
+
+      - name: Create table
+        run: |
+          export PGPASSWORD=password
+          psql -h localhost -U user -d testdb -c "CREATE TABLE data (key TEXT 
PRIMARY KEY, value BYTEA);"
+
+      - name: Test
+        shell: bash
+        working-directory: core
+        run: cargo nextest run postgresql --features services-postgresql
+        env:
+          OPENDAL_POSTGRESQL_TEST: on
+          OPENDAL_POSTGRESQL_CONNECTION_STRING: 
postgresql://user:password@localhost:5432/testdb
+          OPENDAL_POSTGRESQL_TABLE: data
+          OPENDAL_POSTGRESQL_KEY_FIELD: key
+          OPENDAL_POSTGRESQL_VALUE_FIELD: value
diff --git a/Cargo.lock b/Cargo.lock
index 2963f8ed2..2d5408af1 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1638,6 +1638,12 @@ dependencies = [
  "rand 0.7.3",
 ]
 
+[[package]]
+name = "fallible-iterator"
+version = "0.2.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
+
 [[package]]
 name = "fastrand"
 version = "1.9.0"
@@ -3479,6 +3485,7 @@ dependencies = [
  "suppaftp",
  "tikv-client",
  "tokio",
+ "tokio-postgres",
  "tracing",
  "tracing-opentelemetry",
  "tracing-subscriber",
@@ -4060,6 +4067,24 @@ dependencies = [
  "indexmap 1.9.3",
 ]
 
+[[package]]
+name = "phf"
+version = "0.11.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc"
+dependencies = [
+ "phf_shared",
+]
+
+[[package]]
+name = "phf_shared"
+version = "0.11.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b"
+dependencies = [
+ "siphasher",
+]
+
 [[package]]
 name = "pin-project"
 version = "1.1.2"
@@ -4178,6 +4203,35 @@ version = "1.3.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "767eb9f07d4a5ebcb39bbf2d452058a93c011373abf6832e24194a1c3f004794"
 
+[[package]]
+name = "postgres-protocol"
+version = "0.6.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "78b7fa9f396f51dffd61546fd8573ee20592287996568e6175ceb0f8699ad75d"
+dependencies = [
+ "base64 0.21.2",
+ "byteorder",
+ "bytes",
+ "fallible-iterator",
+ "hmac",
+ "md-5",
+ "memchr",
+ "rand 0.8.5",
+ "sha2",
+ "stringprep",
+]
+
+[[package]]
+name = "postgres-types"
+version = "0.2.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f028f05971fe20f512bcc679e2c10227e57809a3af86a7606304435bc8896cd6"
+dependencies = [
+ "bytes",
+ "fallible-iterator",
+ "postgres-protocol",
+]
+
 [[package]]
 name = "ppv-lite86"
 version = "0.2.17"
@@ -5316,6 +5370,12 @@ dependencies = [
  "time 0.3.22",
 ]
 
+[[package]]
+name = "siphasher"
+version = "0.3.10"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
+
 [[package]]
 name = "size"
 version = "0.4.1"
@@ -5484,6 +5544,16 @@ version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
 
+[[package]]
+name = "stringprep"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "db3737bde7edce97102e0e2b15365bf7a20bfdb5f60f4f9e8d7004258a51a8da"
+dependencies = [
+ "unicode-bidi",
+ "unicode-normalization",
+]
+
 [[package]]
 name = "strsim"
 version = "0.10.0"
@@ -5826,6 +5896,30 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "tokio-postgres"
+version = "0.7.8"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "6e89f6234aa8fd43779746012fcf53603cdb91fdd8399aa0de868c2d56b6dde1"
+dependencies = [
+ "async-trait",
+ "byteorder",
+ "bytes",
+ "fallible-iterator",
+ "futures-channel",
+ "futures-util",
+ "log",
+ "parking_lot 0.12.1",
+ "percent-encoding",
+ "phf",
+ "pin-project-lite",
+ "postgres-protocol",
+ "postgres-types",
+ "socket2 0.5.3",
+ "tokio",
+ "tokio-util",
+]
+
 [[package]]
 name = "tokio-rustls"
 version = "0.24.1"
diff --git a/bin/oay/Cargo.toml b/bin/oay/Cargo.toml
index 8556a0418..9bdf490a6 100644
--- a/bin/oay/Cargo.toml
+++ b/bin/oay/Cargo.toml
@@ -30,25 +30,21 @@ rust-version.workspace = true
 version.workspace = true
 
 [features]
-default = [
-  "frontends-webdav",
-  "frontends-s3"
-]
+default = ["frontends-webdav", "frontends-s3"]
 
-frontends-webdav = [
-  "dep:dav-server",
-  "dep:bytes",
-  "dep:futures-util"
-]
 frontends-s3 = []
+frontends-webdav = ["dep:dav-server", "dep:bytes", "dep:futures-util"]
 
 [dependencies]
 anyhow = "1"
 axum = "0.6"
+bytes = { version = "1.4.0", optional = true }
 chrono = "0.4.26"
 clap = { version = "4", features = ["cargo", "string"] }
+dav-server = { version = "0.5.5", optional = true }
 dirs = "5.0.0"
 futures = "0.3"
+futures-util = { version = "0.3.16", optional = true }
 opendal.workspace = true
 quick-xml = { version = "0.29", features = ["serialize", "overlapped-lists"] }
 serde = { version = "1", features = ["derive"] }
@@ -65,6 +61,3 @@ tracing = "0.1"
 tracing-subscriber = { version = "0.3", features = ["env-filter"] }
 url = "2.3.1"
 uuid = { version = "1", features = ["v4", "fast-rng"] }
-dav-server = { version = "0.5.5", optional = true }
-bytes = { version = "1.4.0", optional = true }
-futures-util = { version = "0.3.16", optional = true }
diff --git a/bin/oli/Cargo.toml b/bin/oli/Cargo.toml
index 42b5ed69c..3402aaa59 100644
--- a/bin/oli/Cargo.toml
+++ b/bin/oli/Cargo.toml
@@ -74,4 +74,4 @@ url = "2.3.1"
 [dev-dependencies]
 assert_cmd = "2"
 predicates = "3"
-tempfile = "3.7.1"
\ No newline at end of file
+tempfile = "3.7.1"
diff --git a/bindings/lua/Cargo.toml b/bindings/lua/Cargo.toml
index fed7aa5d3..4661d4f7d 100644
--- a/bindings/lua/Cargo.toml
+++ b/bindings/lua/Cargo.toml
@@ -29,7 +29,6 @@ rust-version.workspace = true
 
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
-
 [features]
 default = ["mlua/lua52"]
 lua52 = ["mlua", "mlua/lua52"]
@@ -38,6 +37,7 @@ lua52 = ["mlua", "mlua/lua52"]
 crate-type = ["cdylib"]
 
 [dependencies]
-mlua = { version = "0.8", features = ["module"], default-features = false, 
optional = true }
+mlua = { version = "0.8", features = [
+  "module",
+], default-features = false, optional = true }
 opendal.workspace = true
-
diff --git a/bindings/ocaml/Cargo.toml b/bindings/ocaml/Cargo.toml
index a0dbd4b17..3dffc0471 100644
--- a/bindings/ocaml/Cargo.toml
+++ b/bindings/ocaml/Cargo.toml
@@ -32,9 +32,8 @@ crate-type = ["staticlib", "cdylib"]
 doc = false
 
 [dependencies]
+ocaml = { version = "^1.0.0-beta" }
 opendal.workspace = true
-ocaml = {version = "^1.0.0-beta"}
 
 [build-dependencies]
-ocaml-build = {version = "^1.0.0-beta"}
-
+ocaml-build = { version = "^1.0.0-beta" }
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 08e0253d9..09087772d 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -150,6 +150,7 @@ services-oss = [
   "reqsign?/reqwest_request",
 ]
 services-persy = ["dep:persy"]
+services-postgresql = ["dep:tokio-postgres"]
 services-redb = ["dep:redb"]
 services-redis = ["dep:redis"]
 services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
@@ -200,7 +201,10 @@ cacache = { version = "11.6", default-features = false, 
features = [
   "tokio-runtime",
   "mmap",
 ], optional = true }
-chrono = { version = "0.4.26", default-features = false, features = ["clock", 
"std"] }
+chrono = { version = "0.4.26", default-features = false, features = [
+  "clock",
+  "std",
+] }
 dashmap = { version = "5.4", optional = true }
 dirs = { version = "5.0.1", optional = true }
 etcd-client = { version = "0.11", optional = true, features = ["tls"] }
@@ -256,6 +260,7 @@ suppaftp = { version = "4.5", default-features = false, 
features = [
 ], optional = true }
 tikv-client = { version = "0.2.0", optional = true }
 tokio = "1.27"
+tokio-postgres = { version = "0.7.8", optional = true }
 tracing = { version = "0.1", optional = true }
 uuid = { version = "1", features = ["serde", "v4"] }
 
diff --git a/core/src/raw/adapters/kv/api.rs b/core/src/raw/adapters/kv/api.rs
index dbe5f8f5f..0c1ad4f95 100644
--- a/core/src/raw/adapters/kv/api.rs
+++ b/core/src/raw/adapters/kv/api.rs
@@ -31,7 +31,7 @@ use crate::Scheme;
 /// By implement this trait, any kv service can work as an OpenDAL Service.
 #[async_trait]
 pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
-    /// Return the medata of this key value accessor.
+    /// Return the metadata of this key value accessor.
     fn metadata(&self) -> Metadata;
 
     /// Get a key from service.
diff --git a/core/src/raw/adapters/mod.rs b/core/src/raw/adapters/mod.rs
index 9864797c0..73dc67bf6 100644
--- a/core/src/raw/adapters/mod.rs
+++ b/core/src/raw/adapters/mod.rs
@@ -43,7 +43,8 @@
 //!
 //! # Available Adapters
 //!
-//! - [`kv::Adapter`]: Adapter for Key Value Services like in-memory map, 
`redis`.
+//! - [`kv::Adapter`]: Adapter for Key Value Services like `redis`.
+//! - [`typed_kv::Adapter`]: Adapter key key value services that in-memory.
 
 pub mod kv;
 pub mod typed_kv;
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 7545b71c5..e3236ef59 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -203,3 +203,8 @@ pub use self::tikv::Tikv;
 mod foundationdb;
 #[cfg(feature = "services-foundationdb")]
 pub use self::foundationdb::Foundationdb;
+
+#[cfg(feature = "services-postgresql")]
+mod postgresql;
+#[cfg(feature = "services-postgresql")]
+pub use self::postgresql::Postgresql;
diff --git a/core/src/services/postgresql/backend.rs 
b/core/src/services/postgresql/backend.rs
new file mode 100644
index 000000000..7600a5bde
--- /dev/null
+++ b/core/src/services/postgresql/backend.rs
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::adapters::kv;
+use crate::raw::*;
+use crate::*;
+use async_trait::async_trait;
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+use std::str::FromStr;
+use std::sync::Arc;
+use tokio::sync::OnceCell;
+use tokio_postgres::{Client, Config, Statement};
+
+/// [Postgresql](https://www.postgresql.org/) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct PostgresqlBuilder {
+    connection_string: Option<String>,
+
+    table: Option<String>,
+    key_field: Option<String>,
+    value_field: Option<String>,
+    root: Option<String>,
+}
+
+impl Debug for PostgresqlBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("Builder");
+        ds.field("table", &self.table);
+        ds.finish()
+    }
+}
+
+impl PostgresqlBuilder {
+    /// Set the connection_string of the postgresql service.
+    ///
+    /// This connection string is `libpq-style connection strings`. There are 
two formats:
+    ///
+    /// ## Key Value
+    ///
+    /// This format consists of space-separated key-value pairs. Values which 
are either the empty
+    /// string or contain whitespace should be wrapped in '. ' and \ 
characters should be
+    /// backslash-escaped.
+    ///
+    /// - `host=localhost user=postgres connect_timeout=10 keepalives=0`
+    /// - `host=/var/lib/postgresql,localhost port=1234 user=postgres 
password='password with spaces'`
+    /// - `host=host1,host2,host3 port=1234,,5678 user=postgres 
target_session_attrs=read-write`
+    ///
+    /// Available keys could found at: 
<https://docs.rs/postgres/latest/postgres/config/struct.Config.html>
+    ///
+    /// ## Url
+    ///
+    /// This format resembles a URL with a scheme of either `postgres://` or 
`postgresql://`.
+    ///
+    /// - `postgresql://user@localhost`
+    /// - 
`postgresql://user:password@%2Fvar%2Flib%2Fpostgresql/mydb?connect_timeout=10`
+    /// - 
`postgresql://user@host1:1234,host2,host3:5678?target_session_attrs=read-write`
+    /// - `postgresql:///mydb?user=user&host=/var/lib/postgresql`
+    ///
+    /// # Notes
+    ///
+    /// If connection_string has been specified, other parameters will be 
ignored.
+    ///
+    /// For more information, please visit 
<https://docs.rs/postgres/latest/postgres/config/struct.Config.html>
+    pub fn connection_string(&mut self, v: &str) -> &mut Self {
+        if !v.is_empty() {
+            self.connection_string = Some(v.to_string());
+        }
+        self
+    }
+
+    /// set the working directory, all operations will be performed under it.
+    ///
+    /// default: "/"
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        if !root.is_empty() {
+            self.root = Some(root.to_owned());
+        }
+        self
+    }
+
+    /// Set the table name of the postgresql service to read/write.
+    pub fn table(&mut self, table: &str) -> &mut Self {
+        if !table.is_empty() {
+            self.table = Some(table.to_string());
+        }
+        self
+    }
+
+    /// Set the key field name of the postgresql service to read/write.
+    ///
+    /// Default to `key` if not specified.
+    pub fn key_field(&mut self, key_field: &str) -> &mut Self {
+        if !key_field.is_empty() {
+            self.key_field = Some(key_field.to_string());
+        }
+        self
+    }
+
+    /// Set the value field name of the postgresql service to read/write.
+    ///
+    /// Default to `value` if not specified.
+    pub fn value_field(&mut self, value_field: &str) -> &mut Self {
+        if !value_field.is_empty() {
+            self.value_field = Some(value_field.to_string());
+        }
+        self
+    }
+}
+
+impl Builder for PostgresqlBuilder {
+    const SCHEME: Scheme = Scheme::Postgresql;
+    type Accessor = PostgresqlBackend;
+
+    fn from_map(map: HashMap<String, String>) -> Self {
+        let mut builder = PostgresqlBuilder::default();
+
+        map.get("connection_string")
+            .map(|v| builder.connection_string(v));
+        map.get("table").map(|v| builder.table(v));
+        map.get("key_field").map(|v| builder.key_field(v));
+        map.get("value_field").map(|v| builder.value_field(v));
+        map.get("root").map(|v| builder.root(v));
+
+        builder
+    }
+
+    fn build(&mut self) -> Result<Self::Accessor> {
+        let conn = match self.connection_string.clone() {
+            Some(v) => v,
+            None => {
+                return Err(
+                    Error::new(ErrorKind::ConfigInvalid, "connection_string is 
empty")
+                        .with_context("service", Scheme::Postgresql),
+                )
+            }
+        };
+
+        let config = Config::from_str(&conn).map_err(|err| {
+            Error::new(ErrorKind::ConfigInvalid, "connection_string is 
invalid")
+                .with_context("service", Scheme::Postgresql)
+                .set_source(err)
+        })?;
+
+        let table = match self.table.clone() {
+            Some(v) => v,
+            None => {
+                return Err(Error::new(ErrorKind::ConfigInvalid, "table is 
empty")
+                    .with_context("service", Scheme::Postgresql))
+            }
+        };
+        let key_field = match self.key_field.clone() {
+            Some(v) => v,
+            None => "key".to_string(),
+        };
+        let value_field = match self.value_field.clone() {
+            Some(v) => v,
+            None => "value".to_string(),
+        };
+        let root = normalize_root(
+            self.root
+                .clone()
+                .unwrap_or_else(|| "/".to_string())
+                .as_str(),
+        );
+
+        Ok(PostgresqlBackend::new(Adapter {
+            client: OnceCell::new(),
+            config,
+            table,
+            key_field,
+            value_field,
+
+            statement_get: OnceCell::new(),
+            statement_set: OnceCell::new(),
+            statement_del: OnceCell::new(),
+        })
+        .with_root(&root))
+    }
+}
+
+/// Backend for Postgresql service
+pub type PostgresqlBackend = kv::Backend<Adapter>;
+
+#[derive(Clone)]
+pub struct Adapter {
+    client: OnceCell<Arc<Client>>,
+    config: Config,
+
+    table: String,
+    key_field: String,
+    value_field: String,
+
+    /// Prepared statements for get/put/delete.
+    statement_get: OnceCell<Statement>,
+    statement_set: OnceCell<Statement>,
+    statement_del: OnceCell<Statement>,
+}
+
+impl Debug for Adapter {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("Adapter");
+        ds.finish_non_exhaustive()
+    }
+}
+
+impl Adapter {
+    async fn get_client(&self) -> Result<&Client> {
+        self.client
+            .get_or_try_init(|| async {
+                // TODO: add tls support.
+                let (client, conn) = 
self.config.connect(tokio_postgres::NoTls).await?;
+
+                // The connection object performs the actual communication 
with the database,
+                // so spawn it off to run on its own.
+                tokio::spawn(async move {
+                    if let Err(e) = conn.await {
+                        eprintln!("postgresql connection error: {}", e);
+                    }
+                });
+
+                Ok(Arc::new(client))
+            })
+            .await
+            .map(|v| v.as_ref())
+    }
+}
+
+#[async_trait]
+impl kv::Adapter for Adapter {
+    fn metadata(&self) -> kv::Metadata {
+        kv::Metadata::new(
+            Scheme::Postgresql,
+            &self.table,
+            Capability {
+                read: true,
+                write: true,
+                ..Default::default()
+            },
+        )
+    }
+
+    async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
+        let query = format!(
+            "SELECT {} FROM {} WHERE {} = $1 LIMIT 1",
+            self.value_field, self.table, self.key_field
+        );
+        let statement = self
+            .statement_get
+            .get_or_try_init(|| async {
+                self.get_client()
+                    .await?
+                    .prepare(&query)
+                    .await
+                    .map_err(Error::from)
+            })
+            .await?;
+
+        let rows = self.get_client().await?.query(statement, &[&path]).await?;
+        if rows.is_empty() {
+            return Ok(None);
+        }
+        let value: Vec<u8> = rows[0].get(0);
+        Ok(Some(value))
+    }
+
+    async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
+        let table = &self.table;
+        let key_field = &self.key_field;
+        let value_field = &self.value_field;
+        let query = format!(
+            "INSERT INTO {table} ({key_field}, {value_field}) \
+                VALUES ($1, $2) \
+                ON CONFLICT ({key_field}) \
+                    DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
+        );
+        let statement = self
+            .statement_set
+            .get_or_try_init(|| async {
+                self.get_client()
+                    .await?
+                    .prepare(&query)
+                    .await
+                    .map_err(Error::from)
+            })
+            .await?;
+
+        let _ = self
+            .get_client()
+            .await?
+            .query(statement, &[&path, &value])
+            .await?;
+        Ok(())
+    }
+
+    async fn delete(&self, path: &str) -> Result<()> {
+        let query = format!("DELETE FROM {} WHERE {} = $1", self.table, 
self.key_field);
+        let statement = self
+            .statement_del
+            .get_or_try_init(|| async {
+                self.get_client()
+                    .await?
+                    .prepare(&query)
+                    .await
+                    .map_err(Error::from)
+            })
+            .await?;
+
+        let _ = self.get_client().await?.query(statement, &[&path]).await?;
+        Ok(())
+    }
+}
+
+impl From<tokio_postgres::Error> for Error {
+    fn from(value: tokio_postgres::Error) -> Error {
+        Error::new(ErrorKind::Unexpected, "unhandled error from 
postgresql").set_source(value)
+    }
+}
diff --git a/core/src/services/postgresql/docs.md 
b/core/src/services/postgresql/docs.md
new file mode 100644
index 000000000..35ac3fff2
--- /dev/null
+++ b/core/src/services/postgresql/docs.md
@@ -0,0 +1,23 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [ ] copy
+- [ ] rename
+- [ ] ~~list~~
+- [ ] scan
+- [ ] ~~presign~~
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the working directory of `OpenDAL`
+- `connection_string`: Set the connection string of postgres server
+- `table`: Set the table of postgresql
+- `key_field`: Set the key field of postgresql
+- `value_field`: Set the value field of postgresql
diff --git a/core/src/raw/adapters/mod.rs b/core/src/services/postgresql/mod.rs
similarity index 51%
copy from core/src/raw/adapters/mod.rs
copy to core/src/services/postgresql/mod.rs
index 9864797c0..427152a7e 100644
--- a/core/src/raw/adapters/mod.rs
+++ b/core/src/services/postgresql/mod.rs
@@ -15,35 +15,5 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Providing adapters and its implementations.
-//!
-//! Adapters in OpenDAL means services that shares similar behaviors. We use
-//! adapter to make those services been implemented more easily. For example,
-//! with [`kv::Adapter`], users only need to implement `get`, `set` for a 
service.
-//!
-//! # Notes
-//!
-//! Please import the module instead of its type.
-//!
-//! For example, use the following:
-//!
-//! ```ignore
-//! use opendal::adapters::kv;
-//!
-//! impl kv::Adapter for MyType {}
-//! ```
-//!
-//! Instead of:
-//!
-//! ```ignore
-//! use opendal::adapters::kv::Adapter;
-//!
-//! impl Adapter for MyType {}
-//! ```
-//!
-//! # Available Adapters
-//!
-//! - [`kv::Adapter`]: Adapter for Key Value Services like in-memory map, 
`redis`.
-
-pub mod kv;
-pub mod typed_kv;
+mod backend;
+pub use backend::PostgresqlBuilder as Postgresql;
diff --git a/core/src/services/tikv/fixtures/pd.toml 
b/core/src/services/tikv/fixtures/pd.toml
index 39393f698..613d1bf8e 100644
--- a/core/src/services/tikv/fixtures/pd.toml
+++ b/core/src/services/tikv/fixtures/pd.toml
@@ -16,5 +16,5 @@
 # under the License.
 
 [schedule]
+max-merge-region-keys = 3
 max-merge-region-size = 1
-max-merge-region-keys = 3
\ No newline at end of file
diff --git a/core/src/services/tikv/fixtures/tikv.toml 
b/core/src/services/tikv/fixtures/tikv.toml
index 2fc4204a5..62e194bf4 100644
--- a/core/src/services/tikv/fixtures/tikv.toml
+++ b/core/src/services/tikv/fixtures/tikv.toml
@@ -16,19 +16,19 @@
 # under the License.
 
 [coprocessor]
+batch-split-limit = 100
 region-max-keys = 10
 region-split-keys = 7
-batch-split-limit = 100
 
 [raftstore]
-region-split-check-diff = "1B"
 pd-heartbeat-tick-interval = "2s"
 pd-store-heartbeat-tick-interval = "5s"
-split-region-check-tick-interval = "1s"
 raft-entry-max-size = "10MB"
+region-split-check-diff = "1B"
+split-region-check-tick-interval = "1s"
 
 [rocksdb]
 max-open-files = 10000
 
 [raftdb]
-max-open-files = 10000
\ No newline at end of file
+max-open-files = 10000
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 4eabb5e25..4fb0303e8 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -84,6 +84,8 @@ pub enum Scheme {
     Persy,
     /// [redis][crate::services::Redis]: Redis services
     Redis,
+    /// [postgresql][crate::services::Postgresql]: Postgresql services
+    Postgresql,
     /// [rocksdb][crate::services::Rocksdb]: RocksDB services
     Rocksdb,
     /// [s3][crate::services::S3]: AWS S3 alike services.
@@ -163,6 +165,7 @@ impl FromStr for Scheme {
             "obs" => Ok(Scheme::Obs),
             "onedrive" => Ok(Scheme::Onedrive),
             "persy" => Ok(Scheme::Persy),
+            "postgresql" => Ok(Scheme::Postgresql),
             "redb" => Ok(Scheme::Redb),
             "redis" => Ok(Scheme::Redis),
             "rocksdb" => Ok(Scheme::Rocksdb),
@@ -206,6 +209,7 @@ impl From<Scheme> for &'static str {
             Scheme::Obs => "obs",
             Scheme::Onedrive => "onedrive",
             Scheme::Persy => "persy",
+            Scheme::Postgresql => "postgresql",
             Scheme::Gdrive => "gdrive",
             Scheme::Dropbox => "dropbox",
             Scheme::Redis => "redis",
diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs
index ca8b2e61f..facbd34a0 100644
--- a/core/tests/behavior/main.rs
+++ b/core/tests/behavior/main.rs
@@ -139,6 +139,8 @@ fn main() -> anyhow::Result<()> {
     tests.extend(behavior_test::<services::Obs>());
     #[cfg(feature = "services-onedrive")]
     tests.extend(behavior_test::<services::Onedrive>());
+    #[cfg(feature = "services-postgresql")]
+    tests.extend(behavior_test::<services::Postgresql>());
     #[cfg(feature = "services-gdrive")]
     tests.extend(behavior_test::<services::Gdrive>());
     #[cfg(feature = "services-dropbox")]

Reply via email to