This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new e2be53963b feat: Support NebulaGraph (#5116)
e2be53963b is described below
commit e2be53963baf77c0169ac2748fc911c745357229
Author: feathercyc <[email protected]>
AuthorDate: Wed Oct 16 23:28:21 2024 +0800
feat: Support NebulaGraph (#5116)
Co-authored-by: feathercyc <[email protected]>
---
core/Cargo.lock | 114 ++++++++++++
core/Cargo.toml | 5 +-
core/src/services/mod.rs | 3 +
core/src/services/nebula_graph/backend.rs | 284 ++++++++++++++++++++++++++++++
core/src/types/operator/builder.rs | 2 +
core/src/types/scheme.rs | 6 +
6 files changed, 413 insertions(+), 1 deletion(-)
diff --git a/core/Cargo.lock b/core/Cargo.lock
index eacc8122a3..b15064c988 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -268,6 +268,19 @@ dependencies = [
"pin-project-lite",
]
+[[package]]
+name = "async-compat"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7bab94bde396a3f7b4962e396fdad640e241ed797d4d8d77fc8c237d14c58fc0"
+dependencies = [
+ "futures-core",
+ "futures-io",
+ "once_cell",
+ "pin-project-lite",
+ "tokio",
+]
+
[[package]]
name = "async-executor"
version = "1.13.1"
@@ -348,6 +361,16 @@ dependencies = [
"syn 2.0.77",
]
+[[package]]
+name = "async-sleep"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c327a532ed3acb8ad885b50bb2ea5fc7c132a396dd990cf855d2825fbdc16c6c"
+dependencies = [
+ "futures-util",
+ "tokio",
+]
+
[[package]]
name = "async-std"
version = "1.13.0"
@@ -1031,6 +1054,12 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "349a06037c7bf932dd7e7d1f653678b2038b9ad46a74102f1fc7bd7872678cce"
+[[package]]
+name = "base64"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
+
[[package]]
name = "base64"
version = "0.13.1"
@@ -1280,6 +1309,15 @@ dependencies = [
"uuid",
]
+[[package]]
+name = "bufsize"
+version = "1.0.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7864afba28009cd99a4d973c3de89cc766b800cdf1bd909966d454906f3bce5d"
+dependencies = [
+ "bytes",
+]
+
[[package]]
name = "bumpalo"
version = "3.16.0"
@@ -1331,6 +1369,9 @@ name = "bytes"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
+dependencies = [
+ "serde",
+]
[[package]]
name = "bytes-utils"
@@ -1819,6 +1860,12 @@ dependencies = [
"wasm-bindgen",
]
+[[package]]
+name = "const-cstr"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ed3d0b5ff30645a68f35ece8cea4556ca14ef8a1651455f789a099a0513532a6"
+
[[package]]
name = "const-oid"
version = "0.9.6"
@@ -3080,6 +3127,17 @@ dependencies = [
"wasm-bindgen",
]
+[[package]]
+name = "ghost"
+version = "0.1.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b0e085ded9f1267c32176b40921b9754c474f7dd96f7e808d4a982e48aa1e854"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn 2.0.77",
+]
+
[[package]]
name = "gimli"
version = "0.31.0"
@@ -4677,12 +4735,14 @@ dependencies = [
"reqsign",
"reqwest 0.12.7",
"rocksdb",
+ "rust-nebula",
"serde",
"serde_json",
"sha1",
"sha2",
"size",
"sled",
+ "snowflaked",
"sqlx",
"suppaftp",
"surrealdb",
@@ -4963,6 +5023,16 @@ dependencies = [
"thiserror",
]
+[[package]]
+name = "ordered-float"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3305af35278dd29f46fcdd139e0b1fbfae2153f0e5928b39b035542dd31e37b7"
+dependencies = [
+ "num-traits",
+ "serde",
+]
+
[[package]]
name = "ordered-multimap"
version = "0.7.3"
@@ -5006,6 +5076,12 @@ dependencies = [
"sha2",
]
+[[package]]
+name = "panic-message"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "384e52fd8fbd4cbe3c317e8216260c21a0f9134de108cea8a4dd4e7e152c472d"
+
[[package]]
name = "parking"
version = "2.2.1"
@@ -6336,6 +6412,35 @@ dependencies = [
"trim-in-place",
]
+[[package]]
+name = "rust-nebula"
+version = "0.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "11a94ea754ca8b05b71ae911b7035180861cebb607c711acd766d14c04f87ae9"
+dependencies = [
+ "anyhow",
+ "async-compat",
+ "async-sleep",
+ "async-trait",
+ "base64 0.11.0",
+ "bb8",
+ "bufsize",
+ "bytes",
+ "const-cstr",
+ "futures",
+ "futures-util",
+ "ghost",
+ "num-derive",
+ "num-traits",
+ "ordered-float",
+ "panic-message",
+ "serde",
+ "serde_json",
+ "thiserror",
+ "tokio",
+ "tracing",
+]
+
[[package]]
name = "rust-stemmers"
version = "1.2.0"
@@ -6956,6 +7061,15 @@ version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"
+[[package]]
+name = "snowflaked"
+version = "1.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "398d462c4c454399be452039b24b0aa0ecb4c7a57f6ae615f5d25de2b032f850"
+dependencies = [
+ "loom",
+]
+
[[package]]
name = "socket2"
version = "0.5.7"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 6b4e5d592d..323d6087c9 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -166,6 +166,7 @@ services-moka = ["dep:moka"]
services-mongodb = ["dep:mongodb"]
services-monoiofs = ["dep:monoio", "dep:flume"]
services-mysql = ["dep:sqlx", "sqlx?/mysql"]
+services-nebula-graph = ["dep:rust-nebula", "dep:bb8", "dep:snowflaked"]
services-obs = [
"dep:reqsign",
"reqsign?/services-huaweicloud",
@@ -204,7 +205,6 @@ services-vercel-blob = []
services-webdav = []
services-webhdfs = []
services-yandex-disk = []
-services-nebula-graph = []
[lib]
bench = false
@@ -346,6 +346,9 @@ compio = { version = "0.11.0", optional = true, features = [
] }
# for services-s3
crc32c = { version = "0.6.6", optional = true }
+# for services-nebula-graph
+rust-nebula = { version = "^0.0.2", optional = true, features = ["graph"] }
+snowflaked = { version = "1", optional = true, features = ["sync"] }
# for services-monoiofs
flume = { version = "0.11", optional = true }
monoio = { version = "0.2.4", optional = true, features = [
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 753838b313..0437dff4a7 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -145,6 +145,9 @@ pub use monoiofs::*;
mod mysql;
pub use self::mysql::*;
+mod nebula_graph;
+pub use nebula_graph::*;
+
mod obs;
pub use obs::*;
diff --git a/core/src/services/nebula_graph/backend.rs
b/core/src/services/nebula_graph/backend.rs
index 6ae3ebb8ea..d03dd5bf2b 100644
--- a/core/src/services/nebula_graph/backend.rs
+++ b/core/src/services/nebula_graph/backend.rs
@@ -17,7 +17,31 @@
use std::fmt::Debug;
+#[cfg(feature = "tests")]
+use std::time::Duration;
+
+use base64::engine::general_purpose::STANDARD as BASE64;
+use base64::engine::Engine as _;
+use bb8::{PooledConnection, RunError};
+use rust_nebula::{
+ graph::GraphQuery, HostAddress, SingleConnSessionConf,
SingleConnSessionManager,
+};
+use snowflaked::sync::Generator;
+use tokio::sync::OnceCell;
+
+use crate::raw::adapters::kv;
+use crate::raw::*;
use crate::services::NebulaGraphConfig;
+use crate::*;
+
+static GENERATOR: Generator = Generator::new(0);
+
+impl Configurator for NebulaGraphConfig {
+ type Builder = NebulaGraphBuilder;
+ fn into_builder(self) -> Self::Builder {
+ NebulaGraphBuilder { config: self }
+ }
+}
#[doc = include_str!("docs.md")]
#[derive(Default)]
@@ -110,3 +134,263 @@ impl NebulaGraphBuilder {
self
}
}
+
+impl Builder for NebulaGraphBuilder {
+ const SCHEME: Scheme = Scheme::NebulaGraph;
+ type Config = NebulaGraphConfig;
+
+ fn build(self) -> Result<impl Access> {
+ let host = match self.config.host.clone() {
+ Some(v) => v,
+ None => {
+ return Err(Error::new(ErrorKind::ConfigInvalid, "host is
empty")
+ .with_context("service", Scheme::NebulaGraph))
+ }
+ };
+ let port = match self.config.port {
+ Some(v) => v,
+ None => {
+ return Err(Error::new(ErrorKind::ConfigInvalid, "port is
empty")
+ .with_context("service", Scheme::NebulaGraph))
+ }
+ };
+ let username = match self.config.username.clone() {
+ Some(v) => v,
+ None => {
+ return Err(Error::new(ErrorKind::ConfigInvalid, "username is
empty")
+ .with_context("service", Scheme::NebulaGraph))
+ }
+ };
+ let password = match self.config.password.clone() {
+ Some(v) => v,
+ None => "".to_string(),
+ };
+ let space = match self.config.space.clone() {
+ Some(v) => v,
+ None => {
+ return Err(Error::new(ErrorKind::ConfigInvalid, "space is
empty")
+ .with_context("service", Scheme::NebulaGraph))
+ }
+ };
+ let tag = match self.config.tag.clone() {
+ Some(v) => v,
+ None => {
+ return Err(Error::new(ErrorKind::ConfigInvalid, "tag is empty")
+ .with_context("service", Scheme::NebulaGraph))
+ }
+ };
+ let key_field = match self.config.key_field.clone() {
+ Some(v) => v,
+ None => "key".to_string(),
+ };
+ let value_field = match self.config.value_field.clone() {
+ Some(v) => v,
+ None => "value".to_string(),
+ };
+ let root = normalize_root(
+ self.config
+ .root
+ .clone()
+ .unwrap_or_else(|| "/".to_string())
+ .as_str(),
+ );
+
+ let mut session_config = SingleConnSessionConf::new(
+ vec![HostAddress::new(&host, port)],
+ username,
+ password,
+ Some(space),
+ );
+ // NebulaGraph use fbthrift for communication. fbthrift's
max_buffer_size is default 4 KB,
+ // which is too small to store something.
+ // So we could set max_buffer_size to 10 MB so that NebulaGraph can
store files with filesize < 1 MB at least.
+ session_config.set_buf_size(1024 * 1024);
+ session_config.set_max_buf_size(64 * 1024 * 1024);
+ session_config.set_max_parse_response_bytes_count(254);
+
+ Ok(NebulaGraphBackend::new(Adapter {
+ session_pool: OnceCell::new(),
+ session_config,
+
+ tag,
+ key_field,
+ value_field,
+ })
+ .with_root(root.as_str()))
+ }
+}
+
+/// Backend for NebulaGraph service
+pub type NebulaGraphBackend = kv::Backend<Adapter>;
+
+#[derive(Clone)]
+pub struct Adapter {
+ session_pool: OnceCell<bb8::Pool<SingleConnSessionManager>>,
+ session_config: SingleConnSessionConf,
+
+ tag: String,
+ key_field: String,
+ value_field: String,
+}
+
+impl Debug for Adapter {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Adapter")
+ .field("session_config", &self.session_config)
+ .field("tag", &self.tag)
+ .field("key_field", &self.key_field)
+ .field("value_field", &self.value_field)
+ .finish()
+ }
+}
+
+impl Adapter {
+ async fn get_session(&self) ->
Result<PooledConnection<SingleConnSessionManager>> {
+ let session_pool = self
+ .session_pool
+ .get_or_try_init(|| async {
+ bb8::Pool::builder()
+ .max_size(64)
+
.build(SingleConnSessionManager::new(self.session_config.clone()))
+ .await
+ })
+ .await
+ .map_err(|err| Error::new(ErrorKind::Unexpected, format!("{}",
err)).set_temporary())?;
+
+ session_pool.get().await.map_err(|err| match err {
+ RunError::User(err) => {
+ Error::new(ErrorKind::Unexpected, format!("{}",
err)).set_temporary()
+ }
+ RunError::TimedOut => {
+ Error::new(ErrorKind::Unexpected, "connection request:
timeout").set_temporary()
+ }
+ })
+ }
+}
+
+impl kv::Adapter for Adapter {
+ fn metadata(&self) -> kv::Metadata {
+ kv::Metadata::new(
+ Scheme::NebulaGraph,
+ &self.session_config.space.clone().unwrap(),
+ Capability {
+ read: true,
+ write: true,
+ write_total_max_size: Some(1024 * 1024),
+ write_can_empty: true,
+ delete: true,
+ list: true,
+ ..Default::default()
+ },
+ )
+ }
+
+ async fn get(&self, path: &str) -> Result<Option<Buffer>> {
+ let path = path.replace("'", "\\'").replace('"', "\\\"");
+ let query = format!(
+ "LOOKUP ON {} WHERE {}.{} == '{}' YIELD properties(vertex).{} AS
{};",
+ self.tag, self.tag, self.key_field, path, self.value_field,
self.value_field
+ );
+ let mut sess = self.get_session().await?;
+ let result = sess
+ .query(&query)
+ .await
+ .map_err(parse_nebulagraph_session_error)?;
+ if result.is_empty() {
+ Ok(None)
+ } else {
+ let row = result
+ .get_row_values_by_index(0)
+ .map_err(parse_nebulagraph_dataset_error)?;
+ let value = row
+ .get_value_by_col_name(&self.value_field)
+ .map_err(parse_nebulagraph_dataset_error)?;
+ let base64_str =
value.as_string().map_err(parse_nebulagraph_dataset_error)?;
+ let value_str = BASE64.decode(base64_str).map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "unhandled error from
nebulagraph")
+ .set_source(err)
+ })?;
+ let buf = Buffer::from(value_str);
+ Ok(Some(buf))
+ }
+ }
+
+ async fn set(&self, path: &str, value: Buffer) -> Result<()> {
+ #[cfg(feature = "tests")]
+ let path_copy = path;
+
+ self.delete(path).await?;
+ let path = path.replace("'", "\\'").replace('"', "\\\"");
+ let file = value.to_vec();
+ let file = BASE64.encode(&file);
+ let snowflake_id: u64 = GENERATOR.generate();
+ let query = format!(
+ "INSERT VERTEX {} VALUES {}:('{}', '{}');",
+ self.tag, snowflake_id, path, file
+ );
+ let mut sess = self.get_session().await?;
+ sess.execute(&query)
+ .await
+ .map_err(parse_nebulagraph_session_error)?;
+
+ // To pass tests, we should confirm NebulaGraph has inserted data
successfully
+ #[cfg(feature = "tests")]
+ loop {
+ let v = self.get(path_copy).await.unwrap();
+ if v.is_none() {
+ std::thread::sleep(Duration::from_millis(1000));
+ } else {
+ break;
+ }
+ }
+ Ok(())
+ }
+
+ async fn delete(&self, path: &str) -> Result<()> {
+ let path = path.replace("'", "\\'").replace('"', "\\\"");
+ let query = format!(
+ "LOOKUP ON {} WHERE {}.{} == '{}' YIELD id(vertex) AS id | DELETE
VERTEX $-.id;",
+ self.tag, self.tag, self.key_field, path
+ );
+ let mut sess = self.get_session().await?;
+ sess.execute(&query)
+ .await
+ .map_err(parse_nebulagraph_session_error)?;
+ Ok(())
+ }
+
+ async fn scan(&self, path: &str) -> Result<Vec<String>> {
+ let path = path.replace("'", "\\'").replace('"', "\\\"");
+ let query = format!(
+ "LOOKUP ON {} WHERE {}.{} STARTS WITH '{}' YIELD
properties(vertex).{} AS {};",
+ self.tag, self.tag, self.key_field, path, self.key_field,
self.key_field
+ );
+
+ let mut sess = self.get_session().await?;
+ let result = sess
+ .query(&query)
+ .await
+ .map_err(parse_nebulagraph_session_error)?;
+ let mut res_vec = vec![];
+ for row_i in 0..result.get_row_size() {
+ let row = result
+ .get_row_values_by_index(row_i)
+ .map_err(parse_nebulagraph_dataset_error)?;
+ let value = row
+ .get_value_by_col_name(&self.key_field)
+ .map_err(parse_nebulagraph_dataset_error)?;
+ let sub_path =
value.as_string().map_err(parse_nebulagraph_dataset_error)?;
+
+ res_vec.push(sub_path);
+ }
+ Ok(res_vec)
+ }
+}
+
+fn parse_nebulagraph_session_error(err: rust_nebula::SingleConnSessionError)
-> Error {
+ Error::new(ErrorKind::Unexpected, "unhandled error from
nebulagraph").set_source(err)
+}
+
+fn parse_nebulagraph_dataset_error(err: rust_nebula::DataSetError) -> Error {
+ Error::new(ErrorKind::Unexpected, "unhandled error from
nebulagraph").set_source(err)
+}
diff --git a/core/src/types/operator/builder.rs
b/core/src/types/operator/builder.rs
index 4691eda6b2..95dfa1c17a 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -292,6 +292,8 @@ impl Operator {
Scheme::HdfsNative =>
Self::from_iter::<services::HdfsNative>(iter)?.finish(),
#[cfg(feature = "services-lakefs")]
Scheme::Lakefs =>
Self::from_iter::<services::Lakefs>(iter)?.finish(),
+ #[cfg(feature = "services-nebula-graph")]
+ Scheme::NebulaGraph =>
Self::from_iter::<services::NebulaGraph>(iter)?.finish(),
v => {
return Err(Error::new(
ErrorKind::Unsupported,
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 4dd8acc69f..c0da5219b8 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -165,6 +165,8 @@ pub enum Scheme {
Surrealdb,
/// [lakefs](crate::services::Lakefs): LakeFS Services
Lakefs,
+ /// [NebulaGraph](crate::services::NebulaGraph): NebulaGraph Services
+ NebulaGraph,
/// Custom that allow users to implement services outside of OpenDAL.
///
/// # NOTE
@@ -315,6 +317,8 @@ impl Scheme {
Scheme::Surrealdb,
#[cfg(feature = "services-lakefs")]
Scheme::Lakefs,
+ #[cfg(feature = "services-nebula-graph")]
+ Scheme::NebulaGraph,
])
}
}
@@ -406,6 +410,7 @@ impl FromStr for Scheme {
"hdfs_native" => Ok(Scheme::HdfsNative),
"surrealdb" => Ok(Scheme::Surrealdb),
"lakefs" => Ok(Scheme::Lakefs),
+ "nebula_graph" => Ok(Scheme::NebulaGraph),
_ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))),
}
}
@@ -480,6 +485,7 @@ impl From<Scheme> for &'static str {
Scheme::HdfsNative => "hdfs_native",
Scheme::Surrealdb => "surrealdb",
Scheme::Lakefs => "lakefs",
+ Scheme::NebulaGraph => "nebula_graph",
Scheme::Custom(v) => v,
}
}