This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new d99cac4289 feat(services): add hdfs native layout (#3933)
d99cac4289 is described below

commit d99cac42894d54da810ad747e1ff1730dcb4d7d7
Author: Shubham Raizada <[email protected]>
AuthorDate: Wed Jan 17 07:32:52 2024 +0530

    feat(services): add hdfs native layout (#3933)
    
    * intial commit for native-hdfs service
    
    * revert async-tls change
    
    * review comments on layout
    
    * remove doc
    
    * fix import issues
    
    * fix import errors
    
    * change service name
    
    * parse hdfs-native error into opendal error
    
    * cargo fmt
    
    * reader and writer implementations
    
    * revert implementation details
    
    * cargo fmt and clippy
    
    * review comments
    
    * rename to error.rs
    
    * fix clippy unused issue for now
    
    * commit cargo.lock file
    
    * Fix cargo lock changes
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
    Co-authored-by: Xuanwo <[email protected]>
---
 Cargo.lock                               |  83 ++++++++++-
 core/Cargo.toml                          |   3 +
 core/src/services/hdfs_native/backend.rs | 241 +++++++++++++++++++++++++++++++
 core/src/services/hdfs_native/docs.md    |   1 +
 core/src/services/hdfs_native/error.rs   |  53 +++++++
 core/src/services/hdfs_native/lister.rs  |  42 ++++++
 core/src/services/hdfs_native/mod.rs     |  25 ++++
 core/src/services/hdfs_native/reader.rs  |  57 ++++++++
 core/src/services/hdfs_native/writer.rs  |  49 +++++++
 core/src/services/mod.rs                 |   7 +
 core/src/types/operator/builder.rs       |   2 +
 core/src/types/scheme.rs                 |   6 +
 12 files changed, 568 insertions(+), 1 deletion(-)

diff --git a/Cargo.lock b/Cargo.lock
index cd56abe92a..f0476a220a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2777,6 +2777,34 @@ dependencies = [
  "byteorder",
 ]
 
+[[package]]
+name = "g2gen"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fc2c7625b2fc250dd90b63f7887a6bb0f7ec1d714c8278415bea2669ef20820e"
+dependencies = [
+ "g2poly",
+ "proc-macro2",
+ "quote",
+ "syn 1.0.109",
+]
+
+[[package]]
+name = "g2p"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fc36d9bdc3d2da057775a9f4fa7d7b09edab3e0eda7a92cc353358fa63b8519e"
+dependencies = [
+ "g2gen",
+ "g2poly",
+]
+
+[[package]]
+name = "g2poly"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "af6a86e750338603ea2c14b1c0bfe58cd61f87ca67a0021d9334996024608e12"
+
 [[package]]
 name = "generator"
 version = "0.7.5"
@@ -2939,6 +2967,31 @@ dependencies = [
  "hashbrown 0.14.2",
 ]
 
+[[package]]
+name = "hdfs-native"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "270a4d5e17b0a3e252ecf3c85bd62d7ad89cb423c1dde8e58cfe50458e9c6066"
+dependencies = [
+ "base64 0.21.5",
+ "bytes",
+ "crc",
+ "futures",
+ "g2p",
+ "libc",
+ "log",
+ "num-traits",
+ "prost 0.11.9",
+ "prost-types 0.11.9",
+ "roxmltree",
+ "socket2 0.5.5",
+ "thiserror",
+ "tokio",
+ "url",
+ "users",
+ "uuid",
+]
+
 [[package]]
 name = "hdfs-sys"
 version = "0.3.0"
@@ -4547,6 +4600,7 @@ dependencies = [
  "futures",
  "getrandom 0.2.11",
  "governor",
+ "hdfs-native",
  "hdrs",
  "hmac",
  "hrana-client-proto",
@@ -5447,7 +5501,7 @@ dependencies = [
  "petgraph",
  "prettyplease",
  "prost 0.12.3",
- "prost-types",
+ "prost-types 0.12.3",
  "regex",
  "syn 2.0.39",
  "tempfile",
@@ -5480,6 +5534,15 @@ dependencies = [
  "syn 2.0.39",
 ]
 
+[[package]]
+name = "prost-types"
+version = "0.11.9"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
+dependencies = [
+ "prost 0.11.9",
+]
+
 [[package]]
 name = "prost-types"
 version = "0.12.3"
@@ -6083,6 +6146,15 @@ dependencies = [
  "librocksdb-sys",
 ]
 
+[[package]]
+name = "roxmltree"
+version = "0.18.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "862340e351ce1b271a378ec53f304a5558f7db87f3769dc655a8f6ecbb68b302"
+dependencies = [
+ "xmlparser",
+]
+
 [[package]]
 name = "rsa"
 version = "0.9.4"
@@ -7743,6 +7815,15 @@ version = "2.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
 
+[[package]]
+name = "users"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "24cc0f6d6f267b73e5a2cadf007ba8f9bc39c6a6f9666f8cf25ea809a153b032"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "utf8parse"
 version = "0.2.1"
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 87c3b32d2f..d04dce44fb 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -200,6 +200,7 @@ services-wasabi = []
 services-webdav = []
 services-webhdfs = []
 services-yandex-disk = []
+services-hdfs-native = ["hdfs-native"]
 
 [lib]
 bench = false
@@ -332,6 +333,8 @@ suppaftp = { version = "5.2", default-features = false, 
features = [
 ], optional = true }
 # for services-tikv
 tikv-client = { version = "0.3.0", optional = true, default-features = false }
+# for services-hdfs-native
+hdfs-native = { version = "0.6.0", optional = true}
 
 # Layers
 # for layers-async-backtrace
diff --git a/core/src/services/hdfs_native/backend.rs 
b/core/src/services/hdfs_native/backend.rs
new file mode 100644
index 0000000000..5c5412b3d8
--- /dev/null
+++ b/core/src/services/hdfs_native/backend.rs
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use hdfs_native::WriteOptions;
+use log::debug;
+use serde::Deserialize;
+// use uuid::Uuid;
+
+use super::error::parse_hdfs_error;
+use super::lister::HdfsNativeLister;
+use super::reader::HdfsNativeReader;
+use super::writer::HdfsNativeWriter;
+use crate::raw::*;
+use crate::*;
+
+/// [Hadoop Distributed File System (HDFS™)](https://hadoop.apache.org/) 
support.
+/// Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native).
+
+/// Config for HdfsNative services support.
+#[derive(Default, Deserialize, Clone)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct HdfsNativeConfig {
+    /// work dir of this backend
+    pub root: Option<String>,
+    /// url of this backend
+    pub url: Option<String>,
+    /// enable the append capacity
+    pub enable_append: bool,
+}
+
+impl Debug for HdfsNativeConfig {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("HdfsNativeConfig")
+            .field("root", &self.root)
+            .field("url", &self.url)
+            .field("enable_append", &self.enable_append)
+            .finish_non_exhaustive()
+    }
+}
+
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct HdfsNativeBuilder {
+    config: HdfsNativeConfig,
+}
+
+impl Debug for HdfsNativeBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("HdfsNativeBuilder")
+            .field("config", &self.config)
+            .finish()
+    }
+}
+
+impl HdfsNativeBuilder {
+    /// Set root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.config.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// Set url of this backend.
+    ///
+    /// Valid format including:
+    ///
+    /// - `default`: using the default setting based on hadoop config.
+    /// - `hdfs://127.0.0.1:9000`: connect to hdfs cluster.
+    pub fn url(&mut self, url: &str) -> &mut Self {
+        if !url.is_empty() {
+            // Trim trailing `/` so that we can accept `http://127.0.0.1:9000/`
+            self.config.url = Some(url.trim_end_matches('/').to_string())
+        }
+
+        self
+    }
+
+    /// Enable append capacity of this backend.
+    ///
+    /// This should be disabled when HDFS runs in non-distributed mode.
+    pub fn enable_append(&mut self, enable_append: bool) -> &mut Self {
+        self.config.enable_append = enable_append;
+        self
+    }
+}
+
+impl Builder for HdfsNativeBuilder {
+    const SCHEME: Scheme = Scheme::HdfsNative;
+    type Accessor = HdfsNativeBackend;
+
+    fn from_map(map: HashMap<String, String>) -> Self {
+        // Deserialize the configuration from the HashMap.
+        let config = 
HdfsNativeConfig::deserialize(ConfigDeserializer::new(map))
+            .expect("config deserialize must succeed");
+
+        // Create an HdfsNativeBuilder instance with the deserialized config.
+        HdfsNativeBuilder { config }
+    }
+
+    fn build(&mut self) -> Result<Self::Accessor> {
+        debug!("backend build started: {:?}", &self);
+
+        let url = match &self.config.url {
+            Some(v) => v,
+            None => {
+                return Err(Error::new(ErrorKind::ConfigInvalid, "url is empty")
+                    .with_context("service", Scheme::HdfsNative))
+            }
+        };
+
+        let root = 
normalize_root(&self.config.root.take().unwrap_or_default());
+        debug!("backend use root {}", root);
+
+        let client = hdfs_native::Client::new(url).map_err(parse_hdfs_error)?;
+
+        // need to check if root dir exists, create if not
+
+        debug!("backend build finished: {:?}", &self);
+        Ok(HdfsNativeBackend {
+            root,
+            client: Arc::new(client),
+            _enable_append: self.config.enable_append,
+        })
+    }
+}
+
+// #[inline]
+// fn tmp_file_of(path: &str) -> String {
+//     let name = get_basename(path);
+//     let uuid = Uuid::new_v4().to_string();
+//
+//     format!("{name}.{uuid}")
+// }
+
+/// Backend for hdfs-native services.
+#[derive(Debug, Clone)]
+pub struct HdfsNativeBackend {
+    root: String,
+    client: Arc<hdfs_native::Client>,
+    _enable_append: bool,
+}
+
+/// hdfs_native::Client is thread-safe.
+unsafe impl Send for HdfsNativeBackend {}
+unsafe impl Sync for HdfsNativeBackend {}
+
+#[async_trait]
+impl Accessor for HdfsNativeBackend {
+    type Reader = HdfsNativeReader;
+    type BlockingReader = ();
+    type Writer = HdfsNativeWriter;
+    type BlockingWriter = ();
+    type Lister = Option<HdfsNativeLister>;
+    type BlockingLister = ();
+
+    fn info(&self) -> AccessorInfo {
+        todo!()
+    }
+
+    async fn create_dir(&self, path: &str, _args: OpCreateDir) -> 
Result<RpCreateDir> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        self.client
+            .mkdirs(&p, 0o777, true)
+            .await
+            .map_err(parse_hdfs_error)?;
+        Ok(RpCreateDir::default())
+    }
+
+    async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        let f = self.client.read(&p).await.map_err(parse_hdfs_error)?;
+
+        let r = HdfsNativeReader::new(f);
+
+        Ok((RpRead::new(), r))
+    }
+
+    async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let p = build_rooted_abs_path(&self.root, path);
+
+        let f = self
+            .client
+            .create(&p, WriteOptions::default())
+            .await
+            .map_err(parse_hdfs_error)?;
+
+        let w = HdfsNativeWriter::new(f);
+
+        Ok((RpWrite::new(), w))
+    }
+
+    async fn copy(&self, _from: &str, _to: &str, _args: OpCopy) -> 
Result<RpCopy> {
+        todo!()
+    }
+
+    async fn rename(&self, _from: &str, _to: &str, _args: OpRename) -> 
Result<RpRename> {
+        todo!()
+    }
+
+    async fn stat(&self, _path: &str, _args: OpStat) -> Result<RpStat> {
+        todo!()
+    }
+
+    async fn delete(&self, _path: &str, _args: OpDelete) -> Result<RpDelete> {
+        todo!()
+    }
+
+    async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let p = build_rooted_abs_path(&self.root, path);
+        let l = HdfsNativeLister::new(p, self.client.clone());
+        Ok((RpList::default(), Some(l)))
+    }
+}
diff --git a/core/src/services/hdfs_native/docs.md 
b/core/src/services/hdfs_native/docs.md
new file mode 100644
index 0000000000..7d720d4986
--- /dev/null
+++ b/core/src/services/hdfs_native/docs.md
@@ -0,0 +1 @@
+A distributed file system that provides high-throughput access to application 
data.
diff --git a/core/src/services/hdfs_native/error.rs 
b/core/src/services/hdfs_native/error.rs
new file mode 100644
index 0000000000..4432716535
--- /dev/null
+++ b/core/src/services/hdfs_native/error.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::*;
+use hdfs_native::HdfsError;
+
+/// Parse hdfs-native error into opendal::Error.
+pub fn parse_hdfs_error(hdfs_error: HdfsError) -> Error {
+    let (kind, retryable, msg) = match &hdfs_error {
+        HdfsError::IOError(err) => (ErrorKind::Unexpected, false, 
err.to_string()),
+        HdfsError::DataTransferError(msg) => (ErrorKind::Unexpected, false, 
msg.clone()),
+        HdfsError::ChecksumError => (
+            ErrorKind::Unexpected,
+            false,
+            "checksums didn't match".to_string(),
+        ),
+        HdfsError::InvalidPath(msg) => (ErrorKind::InvalidInput, false, 
msg.clone()),
+        HdfsError::InvalidArgument(msg) => (ErrorKind::InvalidInput, false, 
msg.clone()),
+        HdfsError::UrlParseError(err) => (ErrorKind::Unexpected, false, 
err.to_string()),
+        HdfsError::AlreadyExists(msg) => (ErrorKind::AlreadyExists, false, 
msg.clone()),
+        HdfsError::OperationFailed(msg) => (ErrorKind::Unexpected, false, 
msg.clone()),
+        HdfsError::FileNotFound(msg) => (ErrorKind::NotFound, false, 
msg.clone()),
+        HdfsError::BlocksNotFound(msg) => (ErrorKind::NotFound, false, 
msg.clone()),
+        HdfsError::IsADirectoryError(msg) => (ErrorKind::IsADirectory, false, 
msg.clone()),
+        _ => (
+            ErrorKind::Unexpected,
+            false,
+            "unexpected error from hdfs".to_string(),
+        ),
+    };
+
+    let mut err = Error::new(kind, &msg).set_source(hdfs_error);
+
+    if retryable {
+        err = err.set_temporary();
+    }
+
+    err
+}
diff --git a/core/src/services/hdfs_native/lister.rs 
b/core/src/services/hdfs_native/lister.rs
new file mode 100644
index 0000000000..deb4336446
--- /dev/null
+++ b/core/src/services/hdfs_native/lister.rs
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::oio;
+use crate::raw::oio::Entry;
+use crate::*;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+pub struct HdfsNativeLister {
+    _path: String,
+    _client: Arc<hdfs_native::Client>,
+}
+
+impl HdfsNativeLister {
+    pub fn new(path: String, client: Arc<hdfs_native::Client>) -> Self {
+        HdfsNativeLister {
+            _path: path,
+            _client: client,
+        }
+    }
+}
+
+impl oio::List for HdfsNativeLister {
+    fn poll_next(&mut self, _cx: &mut Context<'_>) -> 
Poll<Result<Option<Entry>>> {
+        todo!()
+    }
+}
diff --git a/core/src/services/hdfs_native/mod.rs 
b/core/src/services/hdfs_native/mod.rs
new file mode 100644
index 0000000000..a65cacb0a6
--- /dev/null
+++ b/core/src/services/hdfs_native/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::HdfsNativeBuilder as HdfsNative;
+pub use backend::HdfsNativeConfig;
+
+mod error;
+mod lister;
+mod reader;
+mod writer;
diff --git a/core/src/services/hdfs_native/reader.rs 
b/core/src/services/hdfs_native/reader.rs
new file mode 100644
index 0000000000..babbbf32b1
--- /dev/null
+++ b/core/src/services/hdfs_native/reader.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::oio::Read;
+use crate::*;
+use bytes::Bytes;
+use hdfs_native::file::FileReader;
+use std::io::SeekFrom;
+use std::task::{Context, Poll};
+
+pub struct HdfsNativeReader {
+    _f: FileReader,
+}
+
+impl HdfsNativeReader {
+    pub fn new(f: FileReader) -> Self {
+        HdfsNativeReader { _f: f }
+    }
+}
+
+impl Read for HdfsNativeReader {
+    fn poll_read(&mut self, _cx: &mut Context<'_>, _buf: &mut [u8]) -> 
Poll<Result<usize>> {
+        todo!()
+    }
+
+    fn poll_seek(&mut self, cx: &mut Context<'_>, pos: SeekFrom) -> 
Poll<Result<u64>> {
+        let (_, _) = (cx, pos);
+
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "HdfsNativeReader doesn't support seeking",
+        )))
+    }
+
+    fn poll_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<Bytes>>> {
+        let _ = cx;
+
+        Poll::Ready(Some(Err(Error::new(
+            ErrorKind::Unsupported,
+            "HdfsNativeReader doesn't support iterating",
+        ))))
+    }
+}
diff --git a/core/src/services/hdfs_native/writer.rs 
b/core/src/services/hdfs_native/writer.rs
new file mode 100644
index 0000000000..de0349eacf
--- /dev/null
+++ b/core/src/services/hdfs_native/writer.rs
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::raw::oio;
+use crate::raw::oio::WriteBuf;
+use crate::*;
+use hdfs_native::file::FileWriter;
+use std::task::{Context, Poll};
+
+pub struct HdfsNativeWriter {
+    _f: FileWriter,
+}
+
+impl HdfsNativeWriter {
+    pub fn new(f: FileWriter) -> Self {
+        HdfsNativeWriter { _f: f }
+    }
+}
+
+impl oio::Write for HdfsNativeWriter {
+    fn poll_write(&mut self, _cx: &mut Context<'_>, _bs: &dyn WriteBuf) -> 
Poll<Result<usize>> {
+        todo!()
+    }
+
+    fn poll_close(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
+        todo!()
+    }
+
+    fn poll_abort(&mut self, _cx: &mut Context<'_>) -> Poll<Result<()>> {
+        Poll::Ready(Err(Error::new(
+            ErrorKind::Unsupported,
+            "HdfsNativeWriter doesn't support abort",
+        )))
+    }
+}
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 146efb962b..c09a61e913 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -353,6 +353,13 @@ pub use pcloud::Pcloud;
 #[cfg(feature = "services-pcloud")]
 pub use pcloud::PcloudConfig;
 
+#[cfg(feature = "services-hdfs-native")]
+mod hdfs_native;
+#[cfg(feature = "services-hdfs-native")]
+pub use hdfs_native::HdfsNative;
+#[cfg(feature = "services-hdfs-native")]
+pub use hdfs_native::HdfsNativeConfig;
+
 #[cfg(feature = "services-yandex-disk")]
 mod yandex_disk;
 #[cfg(feature = "services-yandex-disk")]
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index 30dcd251ed..ab63e22238 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -261,6 +261,8 @@ impl Operator {
             Scheme::Redb => Self::from_map::<services::Redb>(map)?.finish(),
             #[cfg(feature = "services-mongodb")]
             Scheme::Mongodb => 
Self::from_map::<services::Mongodb>(map)?.finish(),
+            #[cfg(feature = "services-hdfs-native")]
+            Scheme::HdfsNative => 
Self::from_map::<services::HdfsNative>(map)?.finish(),
             v => {
                 return Err(Error::new(
                     ErrorKind::Unsupported,
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index cdb4cf2f67..289fb9ca36 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -145,6 +145,8 @@ pub enum Scheme {
     Mongodb,
     /// [gridfs](crate::services::gridfs): MongoDB Gridfs Services
     Gridfs,
+    /// [Native HDFS](crate::services::hdfs_native): Hdfs Native service, 
using rust hdfs-native client for hdfs
+    HdfsNative,
     /// Custom that allow users to implement services outside of OpenDAL.
     ///
     /// # NOTE
@@ -279,6 +281,8 @@ impl Scheme {
             Scheme::Redb,
             #[cfg(feature = "services-mongodb")]
             Scheme::Mongodb,
+            #[cfg(feature = "services-hdfs-native")]
+            Scheme::HdfsNative,
         ])
     }
 }
@@ -360,6 +364,7 @@ impl FromStr for Scheme {
             "tikv" => Ok(Scheme::Tikv),
             "azfile" => Ok(Scheme::Azfile),
             "mongodb" => Ok(Scheme::Mongodb),
+            "hdfs_native" => Ok(Scheme::HdfsNative),
             _ => Ok(Scheme::Custom(Box::leak(s.into_boxed_str()))),
         }
     }
@@ -424,6 +429,7 @@ impl From<Scheme> for &'static str {
             Scheme::Upyun => "upyun",
             Scheme::YandexDisk => "yandex_disk",
             Scheme::Pcloud => "pcloud",
+            Scheme::HdfsNative => "hdfs_native",
             Scheme::Custom(v) => v,
         }
     }

Reply via email to