This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new d99cac4289 feat(services): add hdfs native layout (#3933)
d99cac4289 is described below
commit d99cac42894d54da810ad747e1ff1730dcb4d7d7
Author: Shubham Raizada <[email protected]>
AuthorDate: Wed Jan 17 07:32:52 2024 +0530
feat(services): add hdfs native layout (#3933)
* intial commit for native-hdfs service
* revert async-tls change
* review comments on layout
* remove doc
* fix import issues
* fix import errors
* change service name
* parse hdfs-native error into opendal error
* cargo fmt
* reader and writer implementations
* revert implementation details
* cargo fmt and clippy
* review comments
* rename to error.rs
* fix clippy unused issue for now
* commit cargo.lock file
* Fix cargo lock changes
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Xuanwo <[email protected]>
---
Cargo.lock | 83 ++++++++++-
core/Cargo.toml | 3 +
core/src/services/hdfs_native/backend.rs | 241 +++++++++++++++++++++++++++++++
core/src/services/hdfs_native/docs.md | 1 +
core/src/services/hdfs_native/error.rs | 53 +++++++
core/src/services/hdfs_native/lister.rs | 42 ++++++
core/src/services/hdfs_native/mod.rs | 25 ++++
core/src/services/hdfs_native/reader.rs | 57 ++++++++
core/src/services/hdfs_native/writer.rs | 49 +++++++
core/src/services/mod.rs | 7 +
core/src/types/operator/builder.rs | 2 +
core/src/types/scheme.rs | 6 +
12 files changed, 568 insertions(+), 1 deletion(-)
diff --git a/Cargo.lock b/Cargo.lock
index cd56abe92a..f0476a220a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2777,6 +2777,34 @@ dependencies = [
"byteorder",
]
+[[package]]
+name = "g2gen"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc2c7625b2fc250dd90b63f7887a6bb0f7ec1d714c8278415bea2669ef20820e"
+dependencies = [
+ "g2poly",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
+[[package]]
+name = "g2p"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fc36d9bdc3d2da057775a9f4fa7d7b09edab3e0eda7a92cc353358fa63b8519e"
+dependencies = [
+ "g2gen",
+ "g2poly",
+]
+
+[[package]]
+name = "g2poly"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "af6a86e750338603ea2c14b1c0bfe58cd61f87ca67a0021d9334996024608e12"
+
[[package]]
name = "generator"
version = "0.7.5"
@@ -2939,6 +2967,31 @@ dependencies = [
"hashbrown 0.14.2",
]
+[[package]]
+name = "hdfs-native"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "270a4d5e17b0a3e252ecf3c85bd62d7ad89cb423c1dde8e58cfe50458e9c6066"
+dependencies = [
+ "base64 0.21.5",
+ "bytes",
+ "crc",
+ "futures",
+ "g2p",
+ "libc",
+ "log",
+ "num-traits",
+ "prost 0.11.9",
+ "prost-types 0.11.9",
+ "roxmltree",
+ "socket2 0.5.5",
+ "thiserror",
+ "tokio",
+ "url",
+ "users",
+ "uuid",
+]
+
[[package]]
name = "hdfs-sys"
version = "0.3.0"
@@ -4547,6 +4600,7 @@ dependencies = [
"futures",
"getrandom 0.2.11",
"governor",
+ "hdfs-native",
"hdrs",
"hmac",
"hrana-client-proto",
@@ -5447,7 +5501,7 @@ dependencies = [
"petgraph",
"prettyplease",
"prost 0.12.3",
- "prost-types",
+ "prost-types 0.12.3",
"regex",
"syn 2.0.39",
"tempfile",
@@ -5480,6 +5534,15 @@ dependencies = [
"syn 2.0.39",
]
+[[package]]
+name = "prost-types"
+version = "0.11.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
+dependencies = [
+ "prost 0.11.9",
+]
+
[[package]]
name = "prost-types"
version = "0.12.3"
@@ -6083,6 +6146,15 @@ dependencies = [
"librocksdb-sys",
]
+[[package]]
+name = "roxmltree"
+version = "0.18.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "862340e351ce1b271a378ec53f304a5558f7db87f3769dc655a8f6ecbb68b302"
+dependencies = [
+ "xmlparser",
+]
+
[[package]]
name = "rsa"
version = "0.9.4"
@@ -7743,6 +7815,15 @@ version = "2.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
+[[package]]
+name = "users"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "24cc0f6d6f267b73e5a2cadf007ba8f9bc39c6a6f9666f8cf25ea809a153b032"
+dependencies = [
+ "libc",
+]
+
[[package]]
name = "utf8parse"
version = "0.2.1"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 87c3b32d2f..d04dce44fb 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -200,6 +200,7 @@ services-wasabi = []
services-webdav = []
services-webhdfs = []
services-yandex-disk = []
+services-hdfs-native = ["hdfs-native"]
[lib]
bench = false
@@ -332,6 +333,8 @@ suppaftp = { version = "5.2", default-features = false,
features = [
], optional = true }
# for services-tikv
tikv-client = { version = "0.3.0", optional = true, default-features = false }
+# for services-hdfs-native
+hdfs-native = { version = "0.6.0", optional = true}
# Layers
# for layers-async-backtrace
diff --git a/core/src/services/hdfs_native/backend.rs
b/core/src/services/hdfs_native/backend.rs
new file mode 100644
index 0000000000..5c5412b3d8
--- /dev/null
+++ b/core/src/services/hdfs_native/backend.rs
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use hdfs_native::WriteOptions;
+use log::debug;
+use serde::Deserialize;
+// use uuid::Uuid;
+
+use super::error::parse_hdfs_error;
+use super::lister::HdfsNativeLister;
+use super::reader::HdfsNativeReader;
+use super::writer::HdfsNativeWriter;
+use crate::raw::*;
+use crate::*;
+
+/// [Hadoop Distributed File System (HDFS™)](https://hadoop.apache.org/)
support.
+/// Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native).
+
+/// Config for HdfsNative services support.
+#[derive(Default, Deserialize, Clone)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct HdfsNativeConfig {
+ /// work dir of this backend
+ pub root: Option<String>,
+ /// url of this backend
+ pub url: Option<String>,
+ /// enable the append capacity
+ pub enable_append: bool,
+}
+
+impl Debug for HdfsNativeConfig {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("HdfsNativeConfig")
+ .field("root", &self.root)
+ .field("url", &self.url)
+ .field("enable_append", &self.enable_append)
+ .finish_non_exhaustive()
+ }
+}
+
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct HdfsNativeBuilder {
+ config: HdfsNativeConfig,
+}
+
+impl Debug for HdfsNativeBuilder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("HdfsNativeBuilder")
+ .field("config", &self.config)
+ .finish()
+ }
+}
+
+impl HdfsNativeBuilder {
+ /// Set root of this backend.
+ ///
+ /// All operations will happen under this root.
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ self.config.root = if root.is_empty() {
+ None
+ } else {
+ Some(root.to_string())
+ };
+
+ self
+ }
+
+ /// Set url of this backend.
+ ///
+ /// Valid format including:
+ ///
+ /// - `default`: using the default setting based on hadoop config.
+ /// - `hdfs://127.0.0.1:9000`: connect to hdfs cluster.
+ pub fn url(&mut self, url: &str) -> &mut Self {
+ if !url.is_empty() {
+ // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
+ self.config.url = Some(url.trim_end_matches('/').to_string())
+ }
+
+ self
+ }
+
+ /// Enable append capacity of this backend.
+ ///
+ /// This should be disabled when HDFS runs in non-distributed mode.
+ pub fn enable_append(&mut self, enable_append: bool) -> &mut Self {
+ self.config.enable_append = enable_append;
+ self
+ }
+}
+
+impl Builder for HdfsNativeBuilder {
+ const SCHEME: Scheme = Scheme::HdfsNative;
+ type Accessor = HdfsNativeBackend;
+
+ fn from_map(map: HashMap<String, String>) -> Self {
+ // Deserialize the configuration from the HashMap.
+ let config =
HdfsNativeConfig::deserialize(ConfigDeserializer::new(map))
+ .expect("config deserialize must succeed");
+
+ // Create an HdfsNativeBuilder instance with the deserialized config.
+ HdfsNativeBuilder { config }
+ }
+
+ fn build(&mut self) -> Result<Self::Accessor> {
+ debug!("backend build started: {:?}", &self);
+
+ let url = match &self.config.url {
+ Some(v) => v,
+ None => {
+ return Err(Error::new(ErrorKind::ConfigInvalid, "url is empty")
+ .with_context("service", Scheme::HdfsNative))
+ }
+ };
+
+ let root =
normalize_root(&self.config.root.take().unwrap_or_default());
+ debug!("backend use root {}", root);
+
+ let client = hdfs_native::Client::new(url).map_err(parse_hdfs_error)?;
+
+ // need to check if root dir exists, create if not
+
+ debug!("backend build finished: {:?}", &self);
+ Ok(HdfsNativeBackend {
+ root,
+ client: Arc::new(client),
+ _enable_append: self.config.enable_append,
+ })
+ }
+}
+
+// #[inline]
+// fn tmp_file_of(path: &str) -> String {
+// let name = get_basename(path);
+// let uuid = Uuid::new_v4().to_string();
+//
+// format!("{name}.{uuid}")
+// }
+
+/// Backend for hdfs-native services.
+#[derive(Debug, Clone)]
+pub struct HdfsNativeBackend {
+ root: String,
+ client: Arc<hdfs_native::Client>,
+ _enable_append: bool,
+}
+
+/// hdfs_native::Client is thread-safe.
+unsafe impl Send for HdfsNativeBackend {}
+unsafe impl Sync for HdfsNativeBackend {}
+
+#[async_trait]
+impl Accessor for HdfsNativeBackend {
+ type Reader = HdfsNativeReader;
+ type BlockingReader = ();
+ type Writer = HdfsNativeWriter;
+ type BlockingWriter = ();
+ type Lister = Option<HdfsNativeLister>;
+ type BlockingLister = ();
+
+ fn info(&self) -> AccessorInfo {
+ todo!()
+ }
+
+ async fn create_dir(&self, path: &str, _args: OpCreateDir) ->
Result<RpCreateDir> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ self.client
+ .mkdirs(&p, 0o777, true)
+ .await
+ .map_err(parse_hdfs_error)?;
+ Ok(RpCreateDir::default())
+ }
+
+ async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
+
+ let r = HdfsNativeReader::new(f);
+
+ Ok((RpRead::new(), r))
+ }
+
+ async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let p = build_rooted_abs_path(&self.root, path);
+
+ let f = self
+ .client
+ .create(&p, WriteOptions::default())
+ .await
+ .map_err(parse_hdfs_error)?;
+
+ let w = HdfsNativeWriter::new(f);
+
+ Ok((RpWrite::new(), w))
+ }
+
+ async fn copy(&self, _from: &str, _to: &str, _args: OpCopy) ->
Result<RpCopy> {
+ todo!()
+ }
+
+ async fn rename(&self, _from: &str, _to: &str, _args: OpRename) ->
Result<RpRename> {
+ todo!()
+ }
+
+ async fn stat(&self, _path: &str, _args: OpStat) -> Result<RpStat> {
+ todo!()
+ }
+
+ async fn delete(&self, _path: &str, _args: OpDelete) -> Result<RpDelete> {
+ todo!()
+ }
+
+ async fn list(&self, path: &str, _args: OpList) -> Result<(RpList,
Self::Lister)> {
+ let p = build_rooted_abs_path(&self.root, path);
+ let l = HdfsNativeLister::new(p, self.client.clone());
+ Ok((RpList::default(), Some(l)))
+ }
+}
diff --git a/core/src/services/hdfs_native/docs.md
b/core/src/services/hdfs_native/docs.md
new file mode 100644
index 0000000000..7d720d4986
--- /dev/null
+++ b/core/src/services/hdfs_native/docs.md
@@ -0,0 +1 @@
+A distributed file system that provides high-throughput access to application
data.
diff --git a/core/src/services/hdfs_native/error.rs
b/core/src/services/hdfs_native/error.rs
new file mode 100644
index 0000000000..4432716535
--- /dev/null
+++ b/core/src/services/hdfs_native/error.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::*;
+use hdfs_native::HdfsError;
+
+/// Parse hdfs-native error into opendal::Error.
+pub fn parse_hdfs_error(hdfs_error: HdfsError) -> Error {
+ let (kind, retryable, msg) = match &hdfs_error {
+ HdfsError::IOError(err) => (ErrorKind::Unexpected, false,
err.to_string()),
+ HdfsError::DataTransferError(msg) => (ErrorKind::Unexpected, false,
msg.clone()),
+ HdfsError::ChecksumError => (
+ ErrorKind::Unexpected,
+ false,
+ "checksums didn't match".to_string(),
+ ),
+ HdfsError::InvalidPath(msg) => (ErrorKind::InvalidInput, false,
msg.clone()),
+ HdfsError::InvalidArgument(msg) => (ErrorKind::InvalidInput, false,
msg.clone()),
+ HdfsError::UrlParseError(err) => (ErrorKind::Unexpected, false,
err.to_string()),
+ HdfsError::AlreadyExists(msg) => (ErrorKind::AlreadyExists, false,
msg.clone()),
+ HdfsError::OperationFailed(msg) => (ErrorKind::Unexpected, false,
msg.clone()),
+ HdfsError::FileNotFound(msg) => (ErrorKind::NotFound, false,
msg.clone()),
+ HdfsError::BlocksNotFound(msg) => (ErrorKind::NotFound, false,
msg.clone()),
+ HdfsError::IsADirectoryError(msg) => (ErrorKind::IsADirectory, false,
msg.clone()),
+ _ => (
+ ErrorKind::Unexpected,
+ false,
+ "unexpected error from hdfs".to_string(),
+ ),
+ };
+
+ let mut err = Error::new(kind, &msg).set_source(hdfs_error);
+
+ if retryable {
+ err = err.set_temporary();
+ }
+
+ err
+}
diff --git a/core/src/services/hdfs_native/lister.rs
b/core/src/services/hdfs_native/lister.rs
new file mode 100644
index 0000000000..deb4336446
--- /dev/null
+++ b/core/src/services/hdfs_native/lister.rs
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::oio;
+use crate::raw::oio::Entry;
+use crate::*;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+pub struct HdfsNativeLister {
+ _path: String,
+ _client: Arc<hdfs_native::Client>,
+}
+
+impl HdfsNativeLister {
+ pub fn new(path: String, client: Arc<hdfs_native::Client>) -> Self {
+ HdfsNativeLister {
+ _path: path,
+ _client: client,
+ }
+ }
+}
+
+impl oio::List for HdfsNativeLister {
+ fn poll_next(&mut self, _cx: &mut Context<'_>) ->
Poll<Result<Option<Entry>>> {
+ todo!()
+ }
+}
diff --git a/core/src/services/hdfs_native/mod.rs
b/core/src/services/hdfs_native/mod.rs
new file mode 100644
index 0000000000..a65cacb0a6
--- /dev/null
+++ b/core/src/services/hdfs_native/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::HdfsNativeBuilder as HdfsNative;
+pub use backend::HdfsNativeConfig;
+
+mod error;
+mod lister;
+mod reader;
+mod writer;
diff --git a/core/src/services/hdfs_native/reader.rs
b/core/src/services/hdfs_native/reader.rs
new file mode 100644
index 0000000000..babbbf32b1
--- /dev/null
+++ b/core/src/services/hdfs_native/reader.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::oio::Read;
+use crate::*;
+use bytes::Bytes;
+use hdfs_native::file::FileReader;
+use std::io::SeekFrom;
+use std::task::{Context, Poll};
+
+pub struct HdfsNativeReader {
+ _f: FileReader,
+}
+
+impl HdfsNativeReader {
+ pub fn new(f: FileReader) -> Self {
+ HdfsNativeReader { _f: f }
+ }
+}
+
+impl Read for HdfsNativeReader {
+ fn poll_read(&mut self, _cx: &mut Context<'_>, _buf: &mut [u8]) ->
Poll<Result<usize>> {
+ todo!()
+ }
+
+ fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) ->
Poll<Result<u64>> {
+ let (_, _) = (cx, pos);
+
+ Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "HdfsNativeReader doesn't support seeking",
+ )))
+ }
+
+ fn poll_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<Bytes>>> {
+ let _ = cx;
+
+ Poll::Ready(Some(Err(Error::new(
+ ErrorKind::Unsupported,
+ "HdfsNativeReader doesn't support iterating",
+ ))))
+ }
+}
diff --git a/core/src/services/hdfs_native/writer.rs
b/core/src/services/hdfs_native/writer.rs
new file mode 100644
index 0000000000..de0349eacf
--- /dev/null
+++ b/core/src/services/hdfs_native/writer.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::oio;
+use crate::raw::oio::WriteBuf;
+use crate::*;
+use hdfs_native::file::FileWriter;
+use std::task::{Context, Poll};
+
+pub struct HdfsNativeWriter {
+ _f: FileWriter,
+}
+
+impl HdfsNativeWriter {
+ pub fn new(f: FileWriter) -> Self {
+ HdfsNativeWriter { _f: f }
+ }
+}
+
+impl oio::Write for HdfsNativeWriter {
+ fn poll_write(&mut self, _cx: &mut Context<'_>, _bs: &dyn WriteBuf) ->
Poll<Result<usize>> {
+ todo!()
+ }
+
+ fn poll_close(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
+ todo!()
+ }
+
+ fn poll_abort(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
+ Poll::Ready(Err(Error::new(
+ ErrorKind::Unsupported,
+ "HdfsNativeWriter doesn't support abort",
+ )))
+ }
+}
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 146efb962b..c09a61e913 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -353,6 +353,13 @@ pub use pcloud::Pcloud;
#[cfg(feature = "services-pcloud")]
pub use pcloud::PcloudConfig;
+#[cfg(feature = "services-hdfs-native")]
+mod hdfs_native;
+#[cfg(feature = "services-hdfs-native")]
+pub use hdfs_native::HdfsNative;
+#[cfg(feature = "services-hdfs-native")]
+pub use hdfs_native::HdfsNativeConfig;
+
#[cfg(feature = "services-yandex-disk")]
mod yandex_disk;
#[cfg(feature = "services-yandex-disk")]
diff --git a/core/src/types/operator/builder.rs
b/core/src/types/operator/builder.rs
index 30dcd251ed..ab63e22238 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -261,6 +261,8 @@ impl Operator {
Scheme::Redb => Self::from_map::<services::Redb>(map)?.finish(),
#[cfg(feature = "services-mongodb")]
Scheme::Mongodb =>
Self::from_map::<services::Mongodb>(map)?.finish(),
+ #[cfg(feature = "services-hdfs-native")]
+ Scheme::HdfsNative =>
Self::from_map::<services::HdfsNative>(map)?.finish(),
v => {
return Err(Error::new(
ErrorKind::Unsupported,
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index cdb4cf2f67..289fb9ca36 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -145,6 +145,8 @@ pub enum Scheme {
Mongodb,
/// [gridfs](crate::services::gridfs): MongoDB Gridfs Services
Gridfs,
+ /// [Native HDFS](crate::services::hdfs_native): Hdfs Native service,
using rust hdfs-native client for hdfs
+ HdfsNative,
/// Custom that allow users to implement services outside of OpenDAL.
///
/// # NOTE
@@ -279,6 +281,8 @@ impl Scheme {
Scheme::Redb,
#[cfg(feature = "services-mongodb")]
Scheme::Mongodb,
+ #[cfg(feature = "services-hdfs-native")]
+ Scheme::HdfsNative,
])
}
}
@@ -360,6 +364,7 @@ impl FromStr for Scheme {
"tikv" => Ok(Scheme::Tikv),
"azfile" => Ok(Scheme::Azfile),
"mongodb" => Ok(Scheme::Mongodb),
+ "hdfs_native" => Ok(Scheme::HdfsNative),
_ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))),
}
}
@@ -424,6 +429,7 @@ impl From<Scheme> for &'static str {
Scheme::Upyun => "upyun",
Scheme::YandexDisk => "yandex_disk",
Scheme::Pcloud => "pcloud",
+ Scheme::HdfsNative => "hdfs_native",
Scheme::Custom(v) => v,
}
}