This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 8d23c1597 refactor(service/hdfs): Add HdfsConfig to implement
ConfigDeserializer (#3800)
8d23c1597 is described below
commit 8d23c159758f3b6d4e72f2d2982a4f7ca2825571
Author: Shubham Raizada <[email protected]>
AuthorDate: Fri Dec 22 13:14:37 2023 +0530
refactor(service/hdfs): Add HdfsConfig to implement ConfigDeserializer
(#3800)
* Add HdfsConfig to implement ConfigDeserializer
* fix conflicting impl debug
* add doc for struct fields
---
core/src/services/hdfs/backend.rs | 90 ++++++++++++++++++++++++++-------------
core/src/services/hdfs/mod.rs | 2 +-
core/src/services/mod.rs | 2 +
3 files changed, 64 insertions(+), 30 deletions(-)
diff --git a/core/src/services/hdfs/backend.rs
b/core/src/services/hdfs/backend.rs
index bdff53de9..b9255a16d 100644
--- a/core/src/services/hdfs/backend.rs
+++ b/core/src/services/hdfs/backend.rs
@@ -16,7 +16,7 @@
// under the License.
use std::collections::HashMap;
-use std::fmt::Debug;
+use std::fmt::{Debug, Formatter};
use std::io;
use std::path::PathBuf;
use std::sync::Arc;
@@ -24,6 +24,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use futures::AsyncWriteExt;
use log::debug;
+use serde::Deserialize;
use super::lister::HdfsLister;
use super::writer::HdfsWriter;
@@ -31,14 +32,51 @@ use crate::raw::*;
use crate::*;
/// [Hadoop Distributed File System (HDFS™)](https://hadoop.apache.org/)
support.
+
+/// Config for Hdfs services support.
+#[derive(Default, Deserialize, Clone)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct HdfsConfig {
+ /// work dir of this backend
+ pub root: Option<String>,
+ /// name node of this backend
+ pub name_node: Option<String>,
+ /// kerberos_ticket_cache_path of this backend
+ pub kerberos_ticket_cache_path: Option<String>,
+ /// user of this backend
+ pub user: Option<String>,
+ /// enable the append capacity
+ pub enable_append: bool,
+}
+
+impl Debug for HdfsConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("HdfsConfig")
+ .field("root", &self.root)
+ .field("name_node", &self.name_node)
+ .field(
+ "kerberos_ticket_cache_path",
+ &self.kerberos_ticket_cache_path,
+ )
+ .field("user", &self.user)
+ .field("enable_append", &self.enable_append)
+ .finish_non_exhaustive()
+ }
+}
+
#[doc = include_str!("docs.md")]
-#[derive(Default, Debug)]
+#[derive(Default)]
pub struct HdfsBuilder {
- root: Option<String>,
- name_node: Option<String>,
- kerberos_ticket_cache_path: Option<String>,
- user: Option<String>,
- enable_append: bool,
+ config: HdfsConfig,
+}
+
+impl Debug for HdfsBuilder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("HdfsBuilder")
+ .field("config", &self.config)
+ .finish()
+ }
}
impl HdfsBuilder {
@@ -46,7 +84,7 @@ impl HdfsBuilder {
///
/// All operations will happen under this root.
pub fn root(&mut self, root: &str) -> &mut Self {
- self.root = if root.is_empty() {
+ self.config.root = if root.is_empty() {
None
} else {
Some(root.to_string())
@@ -64,7 +102,7 @@ impl HdfsBuilder {
pub fn name_node(&mut self, name_node: &str) -> &mut Self {
if !name_node.is_empty() {
// Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
- self.name_node = Some(name_node.trim_end_matches('/').to_string())
+ self.config.name_node =
Some(name_node.trim_end_matches('/').to_string())
}
self
@@ -75,7 +113,7 @@ impl HdfsBuilder {
/// This should be configured when kerberos is enabled.
pub fn kerberos_ticket_cache_path(&mut self, kerberos_ticket_cache_path:
&str) -> &mut Self {
if !kerberos_ticket_cache_path.is_empty() {
- self.kerberos_ticket_cache_path =
Some(kerberos_ticket_cache_path.to_string())
+ self.config.kerberos_ticket_cache_path =
Some(kerberos_ticket_cache_path.to_string())
}
self
}
@@ -83,7 +121,7 @@ impl HdfsBuilder {
/// Set user of this backend
pub fn user(&mut self, user: &str) -> &mut Self {
if !user.is_empty() {
- self.user = Some(user.to_string())
+ self.config.user = Some(user.to_string())
}
self
}
@@ -92,7 +130,7 @@ impl HdfsBuilder {
///
/// This should be disabled when HDFS runs in non-distributed mode.
pub fn enable_append(&mut self, enable_append: bool) -> &mut Self {
- self.enable_append = enable_append;
+ self.config.enable_append = enable_append;
self
}
}
@@ -102,24 +140,18 @@ impl Builder for HdfsBuilder {
type Accessor = HdfsBackend;
fn from_map(map: HashMap<String, String>) -> Self {
- let mut builder = HdfsBuilder::default();
-
- map.get("root").map(|v| builder.root(v));
- map.get("name_node").map(|v| builder.name_node(v));
- map.get("kerberos_ticket_cache_path")
- .map(|v| builder.kerberos_ticket_cache_path(v));
- map.get("user").map(|v| builder.user(v));
- map.get("enable_append").map(|v| {
- builder.enable_append(v.parse().expect("enable_append should be
true or false"))
- });
-
- builder
+ // Deserialize the configuration from the HashMap.
+ let config = HdfsConfig::deserialize(ConfigDeserializer::new(map))
+ .expect("config deserialize must succeed");
+
+ // Create an HdfsBuilder instance with the deserialized config.
+ HdfsBuilder { config }
}
fn build(&mut self) -> Result<Self::Accessor> {
debug!("backend build started: {:?}", &self);
- let name_node = match &self.name_node {
+ let name_node = match &self.config.name_node {
Some(v) => v,
None => {
return Err(Error::new(ErrorKind::ConfigInvalid, "name node is
empty")
@@ -127,14 +159,14 @@ impl Builder for HdfsBuilder {
}
};
- let root = normalize_root(&self.root.take().unwrap_or_default());
+ let root =
normalize_root(&self.config.root.take().unwrap_or_default());
debug!("backend use root {}", root);
let mut builder = hdrs::ClientBuilder::new(name_node);
- if let Some(ticket_cache_path) = &self.kerberos_ticket_cache_path {
+ if let Some(ticket_cache_path) =
&self.config.kerberos_ticket_cache_path {
builder =
builder.with_kerberos_ticket_cache_path(ticket_cache_path.as_str());
}
- if let Some(user) = &self.user {
+ if let Some(user) = &self.config.user {
builder = builder.with_user(user.as_str());
}
@@ -153,7 +185,7 @@ impl Builder for HdfsBuilder {
Ok(HdfsBackend {
root,
client: Arc::new(client),
- enable_append: self.enable_append,
+ enable_append: self.config.enable_append,
})
}
}
diff --git a/core/src/services/hdfs/mod.rs b/core/src/services/hdfs/mod.rs
index da4ae5c63..f5d6506fb 100644
--- a/core/src/services/hdfs/mod.rs
+++ b/core/src/services/hdfs/mod.rs
@@ -17,6 +17,6 @@
mod backend;
pub use backend::HdfsBuilder as Hdfs;
-
+pub use backend::HdfsConfig;
mod lister;
mod writer;
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index a25f8d7bf..db4586e40 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -84,6 +84,8 @@ pub use gridfs::Gridfs;
mod hdfs;
#[cfg(feature = "services-hdfs")]
pub use hdfs::Hdfs;
+#[cfg(feature = "services-hdfs")]
+pub use hdfs::HdfsConfig;
#[cfg(feature = "services-http")]
mod http;