This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 8c2b1a6b3 reafctor: Polish error process code for mysql/postgresql
services (#3269)
8c2b1a6b3 is described below
commit 8c2b1a6b3f9473130db21c6751d85ca710983402
Author: Nadeshiko Manju <[email protected]>
AuthorDate: Fri Oct 13 16:51:52 2023 +0800
reafctor: Polish error process code for mysql/postgresql services (#3269)
* reafctor: Polish error process code for mysql/postgresql services
Signed-off-by: Manjusaka <[email protected]>
* Update code
Signed-off-by: Manjusaka <[email protected]>
---------
Signed-off-by: Manjusaka <[email protected]>
---
core/src/services/mysql/backend.rs | 46 +++++++++++----------
core/src/services/postgresql/backend.rs | 72 ++++++++++++++++++++++++---------
2 files changed, 77 insertions(+), 41 deletions(-)
diff --git a/core/src/services/mysql/backend.rs
b/core/src/services/mysql/backend.rs
index d6d8c9966..bc73864b4 100644
--- a/core/src/services/mysql/backend.rs
+++ b/core/src/services/mysql/backend.rs
@@ -222,12 +222,12 @@ impl kv::Adapter for Adapter {
"SELECT `{}` FROM `{}` WHERE `{}` = :path LIMIT 1",
self.value_field, self.table, self.key_field
);
- let mut conn = self.connection_pool.get_conn().await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "connection
failed").set_source(err)
- })?;
- let statement = conn.prep(query).await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "prepare statement
failed").set_source(err)
- })?;
+ let mut conn = self
+ .connection_pool
+ .get_conn()
+ .await
+ .map_err(parse_mysql_error)?;
+ let statement = conn.prep(query).await.map_err(parse_mysql_error)?;
let result: Option<Vec<u8>> = conn
.exec_first(
statement,
@@ -236,7 +236,7 @@ impl kv::Adapter for Adapter {
},
)
.await
- .map_err(|err| Error::new(ErrorKind::Unexpected, "delete
failed").set_source(err))?;
+ .map_err(parse_mysql_error)?;
match result {
Some(v) => Ok(Some(v)),
None => Ok(None),
@@ -250,12 +250,12 @@ impl kv::Adapter for Adapter {
ON DUPLICATE KEY UPDATE `{}` = VALUES({})",
self.table, self.key_field, self.value_field, self.value_field,
self.value_field
);
- let mut conn = self.connection_pool.get_conn().await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "connection
failed").set_source(err)
- })?;
- let statement = conn.prep(query).await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "prepare statement
failed").set_source(err)
- })?;
+ let mut conn = self
+ .connection_pool
+ .get_conn()
+ .await
+ .map_err(parse_mysql_error)?;
+ let statement = conn.prep(query).await.map_err(parse_mysql_error)?;
conn.exec_drop(
statement,
@@ -265,7 +265,7 @@ impl kv::Adapter for Adapter {
},
)
.await
- .map_err(|err| Error::new(ErrorKind::Unexpected, "set
failed").set_source(err))?;
+ .map_err(parse_mysql_error)?;
Ok(())
}
@@ -274,12 +274,12 @@ impl kv::Adapter for Adapter {
"DELETE FROM `{}` WHERE `{}` = :path",
self.table, self.key_field
);
- let mut conn = self.connection_pool.get_conn().await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "connection
failed").set_source(err)
- })?;
- let statement = conn.prep(query).await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "prepare statement
failed").set_source(err)
- })?;
+ let mut conn = self
+ .connection_pool
+ .get_conn()
+ .await
+ .map_err(parse_mysql_error)?;
+ let statement = conn.prep(query).await.map_err(parse_mysql_error)?;
conn.exec_drop(
statement,
@@ -288,7 +288,11 @@ impl kv::Adapter for Adapter {
},
)
.await
- .map_err(|err| Error::new(ErrorKind::Unexpected, "delete
failed").set_source(err))?;
+ .map_err(parse_mysql_error)?;
Ok(())
}
}
+
+fn parse_mysql_error(err: mysql_async::Error) -> Error {
+ Error::new(ErrorKind::Unexpected, "unhandled error from
mysql").set_source(err)
+}
diff --git a/core/src/services/postgresql/backend.rs
b/core/src/services/postgresql/backend.rs
index cbc4c7dbb..ac7b82a5a 100644
--- a/core/src/services/postgresql/backend.rs
+++ b/core/src/services/postgresql/backend.rs
@@ -222,7 +222,10 @@ impl Adapter {
// TODO: add tls support.
let manager =
PostgresConnectionManager::new(self.config.clone(),
tokio_postgres::NoTls);
- let pool = Pool::builder().build(manager).await?;
+ let pool = Pool::builder()
+ .build(manager)
+ .await
+ .map_err(parse_postgre_error)?;
Ok(Arc::new(pool))
})
.await
@@ -249,11 +252,20 @@ impl kv::Adapter for Adapter {
"SELECT {} FROM {} WHERE {} = $1 LIMIT 1",
self.value_field, self.table, self.key_field
);
- let connection = self.get_client().await?.get().await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "unhandled error from
postgresql").set_source(err)
- })?;
- let statement = connection.prepare(&query).await?;
- let rows = connection.query(&statement, &[&path]).await?;
+ let connection = self
+ .get_client()
+ .await?
+ .get()
+ .await
+ .map_err(parse_bb8_error)?;
+ let statement = connection
+ .prepare(&query)
+ .await
+ .map_err(parse_postgre_error)?;
+ let rows = connection
+ .query(&statement, &[&path])
+ .await
+ .map_err(parse_postgre_error)?;
if rows.is_empty() {
return Ok(None);
}
@@ -271,28 +283,48 @@ impl kv::Adapter for Adapter {
ON CONFLICT ({key_field}) \
DO UPDATE SET {value_field} = EXCLUDED.{value_field}",
);
- let connection = self.get_client().await?.get().await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "unhandled error from
postgresql").set_source(err)
- })?;
- let statement = connection.prepare(&query).await?;
- let _ = connection.query(&statement, &[&path, &value]).await?;
+ let connection = self
+ .get_client()
+ .await?
+ .get()
+ .await
+ .map_err(parse_bb8_error)?;
+ let statement = connection
+ .prepare(&query)
+ .await
+ .map_err(parse_postgre_error)?;
+ let _ = connection
+ .query(&statement, &[&path, &value])
+ .await
+ .map_err(parse_postgre_error)?;
Ok(())
}
async fn delete(&self, path: &str) -> Result<()> {
let query = format!("DELETE FROM {} WHERE {} = $1", self.table,
self.key_field);
- let connection = self.get_client().await?.get().await.map_err(|err| {
- Error::new(ErrorKind::Unexpected, "unhandled error from
postgresql").set_source(err)
- })?;
- let statement = connection.prepare(&query).await?;
+ let connection = self
+ .get_client()
+ .await?
+ .get()
+ .await
+ .map_err(parse_bb8_error)?;
+ let statement = connection
+ .prepare(&query)
+ .await
+ .map_err(parse_postgre_error)?;
- let _ = connection.query(&statement, &[&path]).await?;
+ let _ = connection
+ .query(&statement, &[&path])
+ .await
+ .map_err(parse_postgre_error)?;
Ok(())
}
}
-impl From<tokio_postgres::Error> for Error {
- fn from(value: tokio_postgres::Error) -> Error {
- Error::new(ErrorKind::Unexpected, "unhandled error from
postgresql").set_source(value)
- }
+fn parse_bb8_error(err: bb8::RunError<tokio_postgres::Error>) -> Error {
+ Error::new(ErrorKind::Unexpected, "unhandled error from
postgresql").set_source(err)
+}
+
+fn parse_postgre_error(err: tokio_postgres::Error) -> Error {
+ Error::new(ErrorKind::Unexpected, "unhandled error from
postgresql").set_source(err)
}