NoxTav commented on code in PR #6328:
URL: https://github.com/apache/opendal/pull/6328#discussion_r2452069026


##########
core/src/services/sqlite/backend.rs:
##########
@@ -162,158 +155,218 @@ impl Builder for SqliteBuilder {
     }
 }
 
-pub type SqliteBackend = kv::Backend<Adapter>;
-
-#[derive(Debug, Clone)]
-pub struct Adapter {
-    pool: OnceCell<SqlitePool>,
-    config: SqliteConnectOptions,
+pub fn parse_sqlite_error(err: sqlx::Error) -> Error {
+    let is_temporary = matches!(
+        &err,
+        sqlx::Error::Database(db_err) if db_err.code().is_some_and(|c| c == 
"5" || c == "6")
+    );
 
-    table: String,
-    key_field: String,
-    value_field: String,
-}
+    let message = if is_temporary {
+        "database is locked or busy"
+    } else {
+        "unhandled error from sqlite"
+    };
 
-impl Adapter {
-    async fn get_client(&self) -> Result<&SqlitePool> {
-        self.pool
-            .get_or_try_init(|| async {
-                let pool = SqlitePool::connect_with(self.config.clone())
-                    .await
-                    .map_err(parse_sqlite_error)?;
-                Ok(pool)
-            })
-            .await
+    let mut error = Error::new(ErrorKind::Unexpected, message).set_source(err);
+    if is_temporary {
+        error = error.set_temporary();
     }
+    error
 }
 
-#[self_referencing]
-pub struct SqliteScanner {
-    pool: SqlitePool,
-    query: String,
-
-    #[borrows(pool, query)]
-    #[covariant]
-    stream: BoxStream<'this, Result<String>>,
+/// SqliteAccessor implements Access trait directly
+#[derive(Debug, Clone)]
+pub struct SqliteAccessor {
+    core: std::sync::Arc<SqliteCore>,
+    root: String,
+    info: std::sync::Arc<AccessorInfo>,
 }
 
-impl Stream for SqliteScanner {
-    type Item = Result<String>;
+impl SqliteAccessor {
+    fn new(core: SqliteCore) -> Self {
+        let info = AccessorInfo::default();
+        info.set_scheme(Scheme::Sqlite.into());
+        info.set_name(&core.table);
+        info.set_root("/");
+        info.set_native_capability(Capability {
+            read: true,
+            write: true,
+            create_dir: true,
+            delete: true,
+            stat: true,
+            write_can_empty: true,
+            list: false,
+            shared: false,
+            ..Default::default()
+        });
+
+        Self {
+            core: std::sync::Arc::new(core),
+            root: "/".to_string(),
+            info: std::sync::Arc::new(info),
+        }
+    }
 
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
-        self.with_stream_mut(|s| s.poll_next_unpin(cx))
+    fn with_normalized_root(mut self, root: String) -> Self {
+        self.info.set_root(&root);
+        self.root = root;
+        self
     }
 }
 
-unsafe impl Sync for SqliteScanner {}
+impl Access for SqliteAccessor {
+    type Reader = Buffer;
+    type Writer = SqliteWriter;
+    type Lister = ();
+    type Deleter = oio::OneShotDeleter<SqliteDeleter>;
 
-impl kv::Scan for SqliteScanner {
-    async fn next(&mut self) -> Result<Option<String>> {
-        <Self as StreamExt>::next(self).await.transpose()
+    fn info(&self) -> std::sync::Arc<AccessorInfo> {
+        self.info.clone()
     }
-}
 
-impl kv::Adapter for Adapter {
-    type Scanner = SqliteScanner;
-
-    fn info(&self) -> kv::Info {
-        kv::Info::new(
-            Scheme::Sqlite,
-            &self.table,
-            Capability {
-                read: true,
-                write: true,
-                delete: true,
-                list: true,
-                shared: false,
-                ..Default::default()
-            },
-        )
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let p = build_abs_path(&self.root, path);
+
+        if p == build_abs_path(&self.root, "") {
+            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+        } else {
+            let bs = self.core.get(&p).await?;
+            match bs {
+                Some(bs) => Ok(RpStat::new(
+                    
Metadata::new(EntryMode::from_path(&p)).with_content_length(bs.len() as u64),
+                )),
+                None => {
+                    // Check if this might be a directory by looking for keys 
with this prefix
+                    let dir_path = if p.ends_with('/') {
+                        p.clone()
+                    } else {
+                        format!("{}/", p)
+                    };
+                    let count: i64 = sqlx::query_scalar(&format!(
+                        "SELECT COUNT(*) FROM `{}` WHERE `{}` LIKE $1 LIMIT 1",
+                        self.core.table, self.core.key_field
+                    ))
+                    .bind(format!("{}%", dir_path))
+                    .fetch_one(self.core.get_client().await?)
+                    .await
+                    
.map_err(crate::services::sqlite::backend::parse_sqlite_error)?;
+
+                    if count > 0 {
+                        // Directory exists (has children)
+                        Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+                    } else {
+                        Err(Error::new(ErrorKind::NotFound, "key not found in 
sqlite"))
+                    }
+                }
+            }
+        }
     }
 
-    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
-        let pool = self.get_client().await?;
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let p = build_abs_path(&self.root, path);
 
-        let value: Option<Vec<u8>> = sqlx::query_scalar(&format!(
-            "SELECT `{}` FROM `{}` WHERE `{}` = $1 LIMIT 1",
-            self.value_field, self.table, self.key_field
-        ))
-        .bind(path)
-        .fetch_optional(pool)
-        .await
-        .map_err(parse_sqlite_error)?;
+        let range = args.range();
+        let buffer = if range.is_full() {
+            // Full read - use GET
+            match self.core.get(&p).await? {
+                Some(bs) => bs,
+                None => return Err(Error::new(ErrorKind::NotFound, "key not 
found in sqlite")),
+            }
+        } else {
+            // Range read - use GETRANGE
+            let start = range.offset() as isize;
+            let limit = match range.size() {
+                Some(size) => size as isize,
+                None => -1, // Sqlite uses -1 for end of string
+            };
+
+            match self.core.get_range(&p, start, limit).await? {
+                Some(bs) => bs,
+                None => return Err(Error::new(ErrorKind::NotFound, "key not 
found in sqlite")),
+            }
+        };
 
-        Ok(value.map(Buffer::from))
+        Ok((RpRead::new(), buffer))
     }
 
-    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
-        let pool = self.get_client().await?;
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let p = build_abs_path(&self.root, path);
+        Ok((RpWrite::new(), SqliteWriter::new(self.core.clone(), &p)))
+    }
 
-        sqlx::query(&format!(
-            "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)",
-            self.table, self.key_field, self.value_field,
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        Ok((
+            RpDelete::default(),
+            oio::OneShotDeleter::new(SqliteDeleter::new(self.core.clone(), 
self.root.clone())),
         ))
-        .bind(path)
-        .bind(value.to_vec())
-        .execute(pool)
-        .await
-        .map_err(parse_sqlite_error)?;
-
-        Ok(())
     }
 
-    async fn delete(&self, path: &str) -> Result<()> {
-        let pool = self.get_client().await?;
+    async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
+        let p = build_abs_path(&self.root, path);
 
-        sqlx::query(&format!(
-            "DELETE FROM `{}` WHERE `{}` = $1",
-            self.table, self.key_field
-        ))
-        .bind(path)
-        .execute(pool)
-        .await
-        .map_err(parse_sqlite_error)?;
+        // Ensure path ends with '/' for directory marker
+        let dir_path = if p.ends_with('/') {
+            p
+        } else {
+            format!("{}/", p)
+        };
+
+        // Store directory marker with empty content
+        self.core.set(&dir_path, Buffer::new()).await?;
 
-        Ok(())
+        Ok(RpCreateDir::default())
     }
+}
 
-    async fn scan(&self, path: &str) -> Result<Self::Scanner> {
-        let pool = self.get_client().await?;
-        let stream = SqliteScannerBuilder {
-            pool: pool.clone(),
-            query: format!(
-                "SELECT `{}` FROM `{}` WHERE `{}` LIKE $1",
-                self.key_field, self.table, self.key_field
-            ),
-            stream_builder: |pool, query| {
-                sqlx::query_scalar(query)
-                    .bind(format!("{path}%"))
-                    .fetch(pool)
-                    .map(|v| v.map_err(parse_sqlite_error))
-                    .boxed()
-            },
-        }
-        .build();
+#[cfg(test)]
+mod test {
+    use super::*;
+    use sqlx::SqlitePool;
 
-        Ok(stream)
+    async fn build_client() -> OnceCell<SqlitePool> {
+        let file = tempfile::NamedTempFile::new().unwrap();
+        let config = 
SqliteConnectOptions::from_str(file.path().to_str().unwrap()).unwrap();

Review Comment:
   Fixed.



##########
core/Cargo.toml:
##########
@@ -428,6 +428,7 @@ rand = "0.8"
 sha2 = "0.10"
 size = "0.5"
 tokio = { version = "1.48", features = ["fs", "macros", "rt-multi-thread"] }
+tempfile = "3.20.0"

Review Comment:
   @Xuanwo Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to