This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c2b1a6b3 reafctor: Polish error process code for mysql/postgresql 
services (#3269)
8c2b1a6b3 is described below

commit 8c2b1a6b3f9473130db21c6751d85ca710983402
Author: Nadeshiko Manju <[email protected]>
AuthorDate: Fri Oct 13 16:51:52 2023 +0800

    reafctor: Polish error process code for mysql/postgresql services (#3269)
    
    * reafctor: Polish error process code for mysql/postgresql services
    
    Signed-off-by: Manjusaka <[email protected]>
    
    * Update code
    
    Signed-off-by: Manjusaka <[email protected]>
    
    ---------
    
    Signed-off-by: Manjusaka <[email protected]>
---
 core/src/services/mysql/backend.rs      | 46 +++++++++++----------
 core/src/services/postgresql/backend.rs | 72 ++++++++++++++++++++++++---------
 2 files changed, 77 insertions(+), 41 deletions(-)

diff --git a/core/src/services/mysql/backend.rs 
b/core/src/services/mysql/backend.rs
index d6d8c9966..bc73864b4 100644
--- a/core/src/services/mysql/backend.rs
+++ b/core/src/services/mysql/backend.rs
@@ -222,12 +222,12 @@ impl kv::Adapter for Adapter {
             "SELECT `{}` FROM `{}` WHERE `{}` = :path LIMIT 1",
             self.value_field, self.table, self.key_field
         );
-        let mut conn = self.connection_pool.get_conn().await.map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "connection 
failed").set_source(err)
-        })?;
-        let statement = conn.prep(query).await.map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "prepare statement 
failed").set_source(err)
-        })?;
+        let mut conn = self
+            .connection_pool
+            .get_conn()
+            .await
+            .map_err(parse_mysql_error)?;
+        let statement = conn.prep(query).await.map_err(parse_mysql_error)?;
         let result: Option<Vec<u8>> = conn
             .exec_first(
                 statement,
@@ -236,7 +236,7 @@ impl kv::Adapter for Adapter {
                 },
             )
             .await
-            .map_err(|err| Error::new(ErrorKind::Unexpected, "delete 
failed").set_source(err))?;
+            .map_err(parse_mysql_error)?;
         match result {
             Some(v) => Ok(Some(v)),
             None => Ok(None),
@@ -250,12 +250,12 @@ impl kv::Adapter for Adapter {
             ON DUPLICATE KEY UPDATE `{}` = VALUES({})",
             self.table, self.key_field, self.value_field, self.value_field, 
self.value_field
         );
-        let mut conn = self.connection_pool.get_conn().await.map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "connection 
failed").set_source(err)
-        })?;
-        let statement = conn.prep(query).await.map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "prepare statement 
failed").set_source(err)
-        })?;
+        let mut conn = self
+            .connection_pool
+            .get_conn()
+            .await
+            .map_err(parse_mysql_error)?;
+        let statement = conn.prep(query).await.map_err(parse_mysql_error)?;
 
         conn.exec_drop(
             statement,
@@ -265,7 +265,7 @@ impl kv::Adapter for Adapter {
             },
         )
         .await
-        .map_err(|err| Error::new(ErrorKind::Unexpected, "set 
failed").set_source(err))?;
+        .map_err(parse_mysql_error)?;
         Ok(())
     }
 
@@ -274,12 +274,12 @@ impl kv::Adapter for Adapter {
             "DELETE FROM `{}` WHERE `{}` = :path",
             self.table, self.key_field
         );
-        let mut conn = self.connection_pool.get_conn().await.map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "connection 
failed").set_source(err)
-        })?;
-        let statement = conn.prep(query).await.map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "prepare statement 
failed").set_source(err)
-        })?;
+        let mut conn = self
+            .connection_pool
+            .get_conn()
+            .await
+            .map_err(parse_mysql_error)?;
+        let statement = conn.prep(query).await.map_err(parse_mysql_error)?;
 
         conn.exec_drop(
             statement,
@@ -288,7 +288,11 @@ impl kv::Adapter for Adapter {
             },
         )
         .await
-        .map_err(|err| Error::new(ErrorKind::Unexpected, "delete 
failed").set_source(err))?;
+        .map_err(parse_mysql_error)?;
         Ok(())
     }
 }
+
+fn parse_mysql_error(err: mysql_async::Error) -> Error {
+    Error::new(ErrorKind::Unexpected, "unhandled error from 
mysql").set_source(err)
+}
diff --git a/core/src/services/postgresql/backend.rs 
b/core/src/services/postgresql/backend.rs
index cbc4c7dbb..ac7b82a5a 100644
--- a/core/src/services/postgresql/backend.rs
+++ b/core/src/services/postgresql/backend.rs
@@ -222,7 +222,10 @@ impl Adapter {
                 // TODO: add tls support.
                 let manager =
                     PostgresConnectionManager::new(self.config.clone(), 
tokio_postgres::NoTls);
-                let pool = Pool::builder().build(manager).await?;
+                let pool = Pool::builder()
+                    .build(manager)
+                    .await
+                    .map_err(parse_postgre_error)?;
                 Ok(Arc::new(pool))
             })
             .await
@@ -249,11 +252,20 @@ impl kv::Adapter for Adapter {
             "SELECT {} FROM {} WHERE {} = $1 LIMIT 1",
             self.value_field, self.table, self.key_field
         );
-        let connection = self.get_client().await?.get().await.map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "unhandled error from 
postgresql").set_source(err)
-        })?;
-        let statement = connection.prepare(&query).await?;
-        let rows = connection.query(&statement, &[&path]).await?;
+        let connection = self
+            .get_client()
+            .await?
+            .get()
+            .await
+            .map_err(parse_bb8_error)?;
+        let statement = connection
+            .prepare(&query)
+            .await
+            .map_err(parse_postgre_error)?;
+        let rows = connection
+            .query(&statement, &[&path])
+            .await
+            .map_err(parse_postgre_error)?;
         if rows.is_empty() {
             return Ok(None);
         }
@@ -271,28 +283,48 @@ impl kv::Adapter for Adapter {
                 ON CONFLICT ({key_field}) \
                     DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
         );
-        let connection = self.get_client().await?.get().await.map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "unhandled error from 
postgresql").set_source(err)
-        })?;
-        let statement = connection.prepare(&query).await?;
-        let _ = connection.query(&statement, &[&path, &value]).await?;
+        let connection = self
+            .get_client()
+            .await?
+            .get()
+            .await
+            .map_err(parse_bb8_error)?;
+        let statement = connection
+            .prepare(&query)
+            .await
+            .map_err(parse_postgre_error)?;
+        let _ = connection
+            .query(&statement, &[&path, &value])
+            .await
+            .map_err(parse_postgre_error)?;
         Ok(())
     }
 
     async fn delete(&self, path: &str) -> Result<()> {
         let query = format!("DELETE FROM {} WHERE {} = $1", self.table, 
self.key_field);
-        let connection = self.get_client().await?.get().await.map_err(|err| {
-            Error::new(ErrorKind::Unexpected, "unhandled error from 
postgresql").set_source(err)
-        })?;
-        let statement = connection.prepare(&query).await?;
+        let connection = self
+            .get_client()
+            .await?
+            .get()
+            .await
+            .map_err(parse_bb8_error)?;
+        let statement = connection
+            .prepare(&query)
+            .await
+            .map_err(parse_postgre_error)?;
 
-        let _ = connection.query(&statement, &[&path]).await?;
+        let _ = connection
+            .query(&statement, &[&path])
+            .await
+            .map_err(parse_postgre_error)?;
         Ok(())
     }
 }
 
-impl From<tokio_postgres::Error> for Error {
-    fn from(value: tokio_postgres::Error) -> Error {
-        Error::new(ErrorKind::Unexpected, "unhandled error from 
postgresql").set_source(value)
-    }
+fn parse_bb8_error(err: bb8::RunError<tokio_postgres::Error>) -> Error {
+    Error::new(ErrorKind::Unexpected, "unhandled error from 
postgresql").set_source(err)
+}
+
+fn parse_postgre_error(err: tokio_postgres::Error) -> Error {
+    Error::new(ErrorKind::Unexpected, "unhandled error from 
postgresql").set_source(err)
 }

Reply via email to