This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new e2be53963b feat: Support NebulaGraph (#5116)
e2be53963b is described below

commit e2be53963baf77c0169ac2748fc911c745357229
Author: feathercyc <[email protected]>
AuthorDate: Wed Oct 16 23:28:21 2024 +0800

    feat: Support NebulaGraph (#5116)
    
    Co-authored-by: feathercyc <[email protected]>
---
 core/Cargo.lock                           | 114 ++++++++++++
 core/Cargo.toml                           |   5 +-
 core/src/services/mod.rs                  |   3 +
 core/src/services/nebula_graph/backend.rs | 284 ++++++++++++++++++++++++++++++
 core/src/types/operator/builder.rs        |   2 +
 core/src/types/scheme.rs                  |   6 +
 6 files changed, 413 insertions(+), 1 deletion(-)

diff --git a/core/Cargo.lock b/core/Cargo.lock
index eacc8122a3..b15064c988 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -268,6 +268,19 @@ dependencies = [
  "pin-project-lite",
 ]
 
+[[package]]
+name = "async-compat"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7bab94bde396a3f7b4962e396fdad640e241ed797d4d8d77fc8c237d14c58fc0"
+dependencies = [
+ "futures-core",
+ "futures-io",
+ "once_cell",
+ "pin-project-lite",
+ "tokio",
+]
+
 [[package]]
 name = "async-executor"
 version = "1.13.1"
@@ -348,6 +361,16 @@ dependencies = [
  "syn 2.0.77",
 ]
 
+[[package]]
+name = "async-sleep"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "c327a532ed3acb8ad885b50bb2ea5fc7c132a396dd990cf855d2825fbdc16c6c"
+dependencies = [
+ "futures-util",
+ "tokio",
+]
+
 [[package]]
 name = "async-std"
 version = "1.13.0"
@@ -1031,6 +1054,12 @@ version = "0.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce"
 
+[[package]]
+name = "base64"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
+
 [[package]]
 name = "base64"
 version = "0.13.1"
@@ -1280,6 +1309,15 @@ dependencies = [
  "uuid",
 ]
 
+[[package]]
+name = "bufsize"
+version = "1.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7864afba28009cd99a4d973c3de89cc766b800cdf1bd909966d454906f3bce5d"
+dependencies = [
+ "bytes",
+]
+
 [[package]]
 name = "bumpalo"
 version = "3.16.0"
@@ -1331,6 +1369,9 @@ name = "bytes"
 version = "1.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
+dependencies = [
+ "serde",
+]
 
 [[package]]
 name = "bytes-utils"
@@ -1819,6 +1860,12 @@ dependencies = [
  "wasm-bindgen",
 ]
 
+[[package]]
+name = "const-cstr"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ed3d0b5ff30645a68f35ece8cea4556ca14ef8a1651455f789a099a0513532a6"
+
 [[package]]
 name = "const-oid"
 version = "0.9.6"
@@ -3080,6 +3127,17 @@ dependencies = [
  "wasm-bindgen",
 ]
 
+[[package]]
+name = "ghost"
+version = "0.1.17"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b0e085ded9f1267c32176b40921b9754c474f7dd96f7e808d4a982e48aa1e854"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.77",
+]
+
 [[package]]
 name = "gimli"
 version = "0.31.0"
@@ -4677,12 +4735,14 @@ dependencies = [
  "reqsign",
  "reqwest 0.12.7",
  "rocksdb",
+ "rust-nebula",
  "serde",
  "serde_json",
  "sha1",
  "sha2",
  "size",
  "sled",
+ "snowflaked",
  "sqlx",
  "suppaftp",
  "surrealdb",
@@ -4963,6 +5023,16 @@ dependencies = [
  "thiserror",
 ]
 
+[[package]]
+name = "ordered-float"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7"
+dependencies = [
+ "num-traits",
+ "serde",
+]
+
 [[package]]
 name = "ordered-multimap"
 version = "0.7.3"
@@ -5006,6 +5076,12 @@ dependencies = [
  "sha2",
 ]
 
+[[package]]
+name = "panic-message"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d"
+
 [[package]]
 name = "parking"
 version = "2.2.1"
@@ -6336,6 +6412,35 @@ dependencies = [
  "trim-in-place",
 ]
 
+[[package]]
+name = "rust-nebula"
+version = "0.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "11a94ea754ca8b05b71ae911b7035180861cebb607c711acd766d14c04f87ae9"
+dependencies = [
+ "anyhow",
+ "async-compat",
+ "async-sleep",
+ "async-trait",
+ "base64 0.11.0",
+ "bb8",
+ "bufsize",
+ "bytes",
+ "const-cstr",
+ "futures",
+ "futures-util",
+ "ghost",
+ "num-derive",
+ "num-traits",
+ "ordered-float",
+ "panic-message",
+ "serde",
+ "serde_json",
+ "thiserror",
+ "tokio",
+ "tracing",
+]
+
 [[package]]
 name = "rust-stemmers"
 version = "1.2.0"
@@ -6956,6 +7061,15 @@ version = "1.1.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
 
+[[package]]
+name = "snowflaked"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "398d462c4c454399be452039b24b0aa0ecb4c7a57f6ae615f5d25de2b032f850"
+dependencies = [
+ "loom",
+]
+
 [[package]]
 name = "socket2"
 version = "0.5.7"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 6b4e5d592d..323d6087c9 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -166,6 +166,7 @@ services-moka = ["dep:moka"]
 services-mongodb = ["dep:mongodb"]
 services-monoiofs = ["dep:monoio", "dep:flume"]
 services-mysql = ["dep:sqlx", "sqlx?/mysql"]
+services-nebula-graph = ["dep:rust-nebula", "dep:bb8", "dep:snowflaked"]
 services-obs = [
   "dep:reqsign",
   "reqsign?/services-huaweicloud",
@@ -204,7 +205,6 @@ services-vercel-blob = []
 services-webdav = []
 services-webhdfs = []
 services-yandex-disk = []
-services-nebula-graph = []
 
 [lib]
 bench = false
@@ -346,6 +346,9 @@ compio = { version = "0.11.0", optional = true, features = [
 ] }
 # for services-s3
 crc32c = { version = "0.6.6", optional = true }
+# for services-nebula-graph
+rust-nebula = { version = "^0.0.2", optional = true, features = ["graph"] }
+snowflaked = { version = "1", optional = true, features = ["sync"] }
 # for services-monoiofs
 flume = { version = "0.11", optional = true }
 monoio = { version = "0.2.4", optional = true, features = [
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 753838b313..0437dff4a7 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -145,6 +145,9 @@ pub use monoiofs::*;
 mod mysql;
 pub use self::mysql::*;
 
+mod nebula_graph;
+pub use nebula_graph::*;
+
 mod obs;
 pub use obs::*;
 
diff --git a/core/src/services/nebula_graph/backend.rs 
b/core/src/services/nebula_graph/backend.rs
index 6ae3ebb8ea..d03dd5bf2b 100644
--- a/core/src/services/nebula_graph/backend.rs
+++ b/core/src/services/nebula_graph/backend.rs
@@ -17,7 +17,31 @@
 
 use std::fmt::Debug;
 
+#[cfg(feature = "tests")]
+use std::time::Duration;
+
+use base64::engine::general_purpose::STANDARD as BASE64;
+use base64::engine::Engine as _;
+use bb8::{PooledConnection, RunError};
+use rust_nebula::{
+    graph::GraphQuery, HostAddress, SingleConnSessionConf, 
SingleConnSessionManager,
+};
+use snowflaked::sync::Generator;
+use tokio::sync::OnceCell;
+
+use crate::raw::adapters::kv;
+use crate::raw::*;
 use crate::services::NebulaGraphConfig;
+use crate::*;
+
+static GENERATOR: Generator = Generator::new(0);
+
+impl Configurator for NebulaGraphConfig {
+    type Builder = NebulaGraphBuilder;
+    fn into_builder(self) -> Self::Builder {
+        NebulaGraphBuilder { config: self }
+    }
+}
 
 #[doc = include_str!("docs.md")]
 #[derive(Default)]
@@ -110,3 +134,263 @@ impl NebulaGraphBuilder {
         self
     }
 }
+
+impl Builder for NebulaGraphBuilder {
+    const SCHEME: Scheme = Scheme::NebulaGraph;
+    type Config = NebulaGraphConfig;
+
+    fn build(self) -> Result<impl Access> {
+        let host = match self.config.host.clone() {
+            Some(v) => v,
+            None => {
+                return Err(Error::new(ErrorKind::ConfigInvalid, "host is 
empty")
+                    .with_context("service", Scheme::NebulaGraph))
+            }
+        };
+        let port = match self.config.port {
+            Some(v) => v,
+            None => {
+                return Err(Error::new(ErrorKind::ConfigInvalid, "port is 
empty")
+                    .with_context("service", Scheme::NebulaGraph))
+            }
+        };
+        let username = match self.config.username.clone() {
+            Some(v) => v,
+            None => {
+                return Err(Error::new(ErrorKind::ConfigInvalid, "username is 
empty")
+                    .with_context("service", Scheme::NebulaGraph))
+            }
+        };
+        let password = match self.config.password.clone() {
+            Some(v) => v,
+            None => "".to_string(),
+        };
+        let space = match self.config.space.clone() {
+            Some(v) => v,
+            None => {
+                return Err(Error::new(ErrorKind::ConfigInvalid, "space is 
empty")
+                    .with_context("service", Scheme::NebulaGraph))
+            }
+        };
+        let tag = match self.config.tag.clone() {
+            Some(v) => v,
+            None => {
+                return Err(Error::new(ErrorKind::ConfigInvalid, "tag is empty")
+                    .with_context("service", Scheme::NebulaGraph))
+            }
+        };
+        let key_field = match self.config.key_field.clone() {
+            Some(v) => v,
+            None => "key".to_string(),
+        };
+        let value_field = match self.config.value_field.clone() {
+            Some(v) => v,
+            None => "value".to_string(),
+        };
+        let root = normalize_root(
+            self.config
+                .root
+                .clone()
+                .unwrap_or_else(|| "/".to_string())
+                .as_str(),
+        );
+
+        let mut session_config = SingleConnSessionConf::new(
+            vec![HostAddress::new(&host, port)],
+            username,
+            password,
+            Some(space),
+        );
+        // NebulaGraph use fbthrift for communication. fbthrift's 
max_buffer_size is default 4 KB,
+        // which is too small to store something.
+        // So we could set max_buffer_size to 10 MB so that NebulaGraph can 
store files with filesize < 1 MB at least.
+        session_config.set_buf_size(1024 * 1024);
+        session_config.set_max_buf_size(64 * 1024 * 1024);
+        session_config.set_max_parse_response_bytes_count(254);
+
+        Ok(NebulaGraphBackend::new(Adapter {
+            session_pool: OnceCell::new(),
+            session_config,
+
+            tag,
+            key_field,
+            value_field,
+        })
+        .with_root(root.as_str()))
+    }
+}
+
+/// Backend for NebulaGraph service
+pub type NebulaGraphBackend = kv::Backend<Adapter>;
+
+#[derive(Clone)]
+pub struct Adapter {
+    session_pool: OnceCell<bb8::Pool<SingleConnSessionManager>>,
+    session_config: SingleConnSessionConf,
+
+    tag: String,
+    key_field: String,
+    value_field: String,
+}
+
+impl Debug for Adapter {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Adapter")
+            .field("session_config", &self.session_config)
+            .field("tag", &self.tag)
+            .field("key_field", &self.key_field)
+            .field("value_field", &self.value_field)
+            .finish()
+    }
+}
+
+impl Adapter {
+    async fn get_session(&self) -> 
Result<PooledConnection<SingleConnSessionManager>> {
+        let session_pool = self
+            .session_pool
+            .get_or_try_init(|| async {
+                bb8::Pool::builder()
+                    .max_size(64)
+                    
.build(SingleConnSessionManager::new(self.session_config.clone()))
+                    .await
+            })
+            .await
+            .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{}", 
err)).set_temporary())?;
+
+        session_pool.get().await.map_err(|err| match err {
+            RunError::User(err) => {
+                Error::new(ErrorKind::Unexpected, format!("{}", 
err)).set_temporary()
+            }
+            RunError::TimedOut => {
+                Error::new(ErrorKind::Unexpected, "connection request: 
timeout").set_temporary()
+            }
+        })
+    }
+}
+
+impl kv::Adapter for Adapter {
+    fn metadata(&self) -> kv::Metadata {
+        kv::Metadata::new(
+            Scheme::NebulaGraph,
+            &self.session_config.space.clone().unwrap(),
+            Capability {
+                read: true,
+                write: true,
+                write_total_max_size: Some(1024 * 1024),
+                write_can_empty: true,
+                delete: true,
+                list: true,
+                ..Default::default()
+            },
+        )
+    }
+
+    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
+        let path = path.replace("'", "\\'").replace('"', "\\\"");
+        let query = format!(
+            "LOOKUP ON {} WHERE {}.{} == '{}' YIELD properties(vertex).{} AS 
{};",
+            self.tag, self.tag, self.key_field, path, self.value_field, 
self.value_field
+        );
+        let mut sess = self.get_session().await?;
+        let result = sess
+            .query(&query)
+            .await
+            .map_err(parse_nebulagraph_session_error)?;
+        if result.is_empty() {
+            Ok(None)
+        } else {
+            let row = result
+                .get_row_values_by_index(0)
+                .map_err(parse_nebulagraph_dataset_error)?;
+            let value = row
+                .get_value_by_col_name(&self.value_field)
+                .map_err(parse_nebulagraph_dataset_error)?;
+            let base64_str = 
value.as_string().map_err(parse_nebulagraph_dataset_error)?;
+            let value_str = BASE64.decode(base64_str).map_err(|err| {
+                Error::new(ErrorKind::Unexpected, "unhandled error from 
nebulagraph")
+                    .set_source(err)
+            })?;
+            let buf = Buffer::from(value_str);
+            Ok(Some(buf))
+        }
+    }
+
+    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
+        #[cfg(feature = "tests")]
+        let path_copy = path;
+
+        self.delete(path).await?;
+        let path = path.replace("'", "\\'").replace('"', "\\\"");
+        let file = value.to_vec();
+        let file = BASE64.encode(&file);
+        let snowflake_id: u64 = GENERATOR.generate();
+        let query = format!(
+            "INSERT VERTEX {} VALUES {}:('{}', '{}');",
+            self.tag, snowflake_id, path, file
+        );
+        let mut sess = self.get_session().await?;
+        sess.execute(&query)
+            .await
+            .map_err(parse_nebulagraph_session_error)?;
+
+        // To pass tests, we should confirm NebulaGraph has inserted data 
successfully
+        #[cfg(feature = "tests")]
+        loop {
+            let v = self.get(path_copy).await.unwrap();
+            if v.is_none() {
+                std::thread::sleep(Duration::from_millis(1000));
+            } else {
+                break;
+            }
+        }
+        Ok(())
+    }
+
+    async fn delete(&self, path: &str) -> Result<()> {
+        let path = path.replace("'", "\\'").replace('"', "\\\"");
+        let query = format!(
+            "LOOKUP ON {} WHERE {}.{} == '{}' YIELD id(vertex) AS id | DELETE 
VERTEX $-.id;",
+            self.tag, self.tag, self.key_field, path
+        );
+        let mut sess = self.get_session().await?;
+        sess.execute(&query)
+            .await
+            .map_err(parse_nebulagraph_session_error)?;
+        Ok(())
+    }
+
+    async fn scan(&self, path: &str) -> Result<Vec<String>> {
+        let path = path.replace("'", "\\'").replace('"', "\\\"");
+        let query = format!(
+            "LOOKUP ON {} WHERE {}.{} STARTS WITH '{}' YIELD 
properties(vertex).{} AS {};",
+            self.tag, self.tag, self.key_field, path, self.key_field, 
self.key_field
+        );
+
+        let mut sess = self.get_session().await?;
+        let result = sess
+            .query(&query)
+            .await
+            .map_err(parse_nebulagraph_session_error)?;
+        let mut res_vec = vec![];
+        for row_i in 0..result.get_row_size() {
+            let row = result
+                .get_row_values_by_index(row_i)
+                .map_err(parse_nebulagraph_dataset_error)?;
+            let value = row
+                .get_value_by_col_name(&self.key_field)
+                .map_err(parse_nebulagraph_dataset_error)?;
+            let sub_path = 
value.as_string().map_err(parse_nebulagraph_dataset_error)?;
+
+            res_vec.push(sub_path);
+        }
+        Ok(res_vec)
+    }
+}
+
+fn parse_nebulagraph_session_error(err: rust_nebula::SingleConnSessionError) 
-> Error {
+    Error::new(ErrorKind::Unexpected, "unhandled error from 
nebulagraph").set_source(err)
+}
+
+fn parse_nebulagraph_dataset_error(err: rust_nebula::DataSetError) -> Error {
+    Error::new(ErrorKind::Unexpected, "unhandled error from 
nebulagraph").set_source(err)
+}
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index 4691eda6b2..95dfa1c17a 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -292,6 +292,8 @@ impl Operator {
             Scheme::HdfsNative => 
Self::from_iter::<services::HdfsNative>(iter)?.finish(),
             #[cfg(feature = "services-lakefs")]
             Scheme::Lakefs => 
Self::from_iter::<services::Lakefs>(iter)?.finish(),
+            #[cfg(feature = "services-nebula-graph")]
+            Scheme::NebulaGraph => 
Self::from_iter::<services::NebulaGraph>(iter)?.finish(),
             v => {
                 return Err(Error::new(
                     ErrorKind::Unsupported,
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 4dd8acc69f..c0da5219b8 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -165,6 +165,8 @@ pub enum Scheme {
     Surrealdb,
     /// [lakefs](crate::services::Lakefs): LakeFS Services
     Lakefs,
+    /// [NebulaGraph](crate::services::NebulaGraph): NebulaGraph Services
+    NebulaGraph,
     /// Custom that allow users to implement services outside of OpenDAL.
     ///
     /// # NOTE
@@ -315,6 +317,8 @@ impl Scheme {
             Scheme::Surrealdb,
             #[cfg(feature = "services-lakefs")]
             Scheme::Lakefs,
+            #[cfg(feature = "services-nebula-graph")]
+            Scheme::NebulaGraph,
         ])
     }
 }
@@ -406,6 +410,7 @@ impl FromStr for Scheme {
             "hdfs_native" => Ok(Scheme::HdfsNative),
             "surrealdb" => Ok(Scheme::Surrealdb),
             "lakefs" => Ok(Scheme::Lakefs),
+            "nebula_graph" => Ok(Scheme::NebulaGraph),
             _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))),
         }
     }
@@ -480,6 +485,7 @@ impl From<Scheme> for &'static str {
             Scheme::HdfsNative => "hdfs_native",
             Scheme::Surrealdb => "surrealdb",
             Scheme::Lakefs => "lakefs",
+            Scheme::NebulaGraph => "nebula_graph",
             Scheme::Custom(v) => v,
         }
     }

Reply via email to