This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch lazy-reader in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 048cce063e3ef2c1e13d27717f6b5ebae9f1f9f0 Author: Xuanwo <[email protected]> AuthorDate: Wed Oct 25 21:08:20 2023 +0800 Remove parse_io_error Signed-off-by: Xuanwo <[email protected]> --- core/src/layers/madsim.rs | 4 -- core/src/services/fs/backend.rs | 81 ++++++++++++++++++---------------- core/src/services/fs/error.rs | 41 ----------------- core/src/services/fs/mod.rs | 1 - core/src/services/fs/pager.rs | 9 ++-- core/src/services/fs/writer.rs | 17 ++++--- core/src/services/hdfs/backend.rs | 54 ++++++++++++----------- core/src/services/hdfs/error.rs | 45 ------------------- core/src/services/hdfs/mod.rs | 1 - core/src/services/hdfs/writer.rs | 11 ++--- core/src/services/memcached/ascii.rs | 26 +++++------ core/src/services/memcached/backend.rs | 6 +-- core/src/services/sftp/backend.rs | 26 ++--------- core/src/services/sftp/error.rs | 1 + core/src/services/sftp/writer.rs | 9 ++-- 15 files changed, 113 insertions(+), 219 deletions(-) diff --git a/core/src/layers/madsim.rs b/core/src/layers/madsim.rs index d10451774..6e2a3b6a3 100644 --- a/core/src/layers/madsim.rs +++ b/core/src/layers/madsim.rs @@ -346,10 +346,6 @@ impl oio::Page for MadsimPager { } } -fn parse_io_error(e: std::io::Error) -> Error { - Error::new(ErrorKind::Unexpected, "madsim error") -} - /// A simulated server.This an experimental feature, docs are not ready yet. #[derive(Default, Clone)] pub struct MadsimServer; diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 9af013cf1..524c4ef5e 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -26,7 +26,6 @@ use chrono::DateTime; use log::debug; use uuid::Uuid; -use super::error::parse_io_error; use super::pager::FsPager; use super::writer::FsWriter; use crate::raw::*; @@ -211,7 +210,7 @@ impl FsBackend { })? .to_path_buf(); - std::fs::create_dir_all(parent).map_err(parse_io_error)?; + std::fs::create_dir_all(parent).map_err(new_std_io_error)?; Ok(p) } @@ -239,7 +238,7 @@ impl FsBackend { tokio::fs::create_dir_all(&parent) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; Ok(p) } @@ -290,7 +289,7 @@ impl Accessor for FsBackend { tokio::fs::create_dir_all(&p) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; Ok(RpCreateDir::default()) } @@ -313,11 +312,11 @@ impl Accessor for FsBackend { .read(true) .open(&p) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; if self.enable_path_check { // Get fs metadata of file at given path, ensuring it is not a false-positive due to slash normalization. - let meta = f.metadata().await.map_err(parse_io_error)?; + let meta = f.metadata().await.map_err(new_std_io_error)?; if meta.is_dir() != path.ends_with('/') { return Err(Error::new( ErrorKind::NotFound, @@ -338,21 +337,21 @@ impl Accessor for FsBackend { let start = f .seek(SeekFrom::End(size as i64)) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; (start, Some(start + size)) } (Some(offset), None) => { let start = f .seek(SeekFrom::Start(offset)) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; (start, None) } (Some(offset), Some(size)) => { let start = f .seek(SeekFrom::Start(offset)) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; (start, Some(size)) } }; @@ -372,7 +371,7 @@ impl Accessor for FsBackend { if op.append() && tokio::fs::try_exists(&target_path) .await - .map_err(parse_io_error)? + .map_err(new_std_io_error)? { (target_path, None) } else { @@ -395,7 +394,7 @@ impl Accessor for FsBackend { let f = open_options .open(tmp_path.as_ref().unwrap_or(&target_path)) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f))) } @@ -404,11 +403,11 @@ impl Accessor for FsBackend { let from = self.root.join(from.trim_end_matches('/')); // try to get the metadata of the source file to ensure it exists - tokio::fs::metadata(&from).await.map_err(parse_io_error)?; + tokio::fs::metadata(&from).await.map_err(new_std_io_error)?; let to = Self::ensure_write_abs_path(&self.root, to.trim_end_matches('/')).await?; - tokio::fs::copy(from, to).await.map_err(parse_io_error)?; + tokio::fs::copy(from, to).await.map_err(new_std_io_error)?; Ok(RpCopy::default()) } @@ -417,11 +416,13 @@ impl Accessor for FsBackend { let from = self.root.join(from.trim_end_matches('/')); // try to get the metadata of the source file to ensure it exists - tokio::fs::metadata(&from).await.map_err(parse_io_error)?; + tokio::fs::metadata(&from).await.map_err(new_std_io_error)?; let to = Self::ensure_write_abs_path(&self.root, to.trim_end_matches('/')).await?; - tokio::fs::rename(from, to).await.map_err(parse_io_error)?; + tokio::fs::rename(from, to) + .await + .map_err(new_std_io_error)?; Ok(RpRename::default()) } @@ -429,7 +430,7 @@ impl Accessor for FsBackend { async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { let p = self.root.join(path.trim_end_matches('/')); - let meta = tokio::fs::metadata(&p).await.map_err(parse_io_error)?; + let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?; if self.enable_path_check && meta.is_dir() != path.ends_with('/') { return Err(Error::new( @@ -450,7 +451,7 @@ impl Accessor for FsBackend { .with_last_modified( meta.modified() .map(DateTime::from) - .map_err(parse_io_error)?, + .map_err(new_std_io_error)?, ); Ok(RpStat::new(m)) @@ -464,15 +465,15 @@ impl Accessor for FsBackend { match meta { Ok(meta) => { if meta.is_dir() { - tokio::fs::remove_dir(&p).await.map_err(parse_io_error)?; + tokio::fs::remove_dir(&p).await.map_err(new_std_io_error)?; } else { - tokio::fs::remove_file(&p).await.map_err(parse_io_error)?; + tokio::fs::remove_file(&p).await.map_err(new_std_io_error)?; } Ok(RpDelete::default()) } Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(RpDelete::default()), - Err(err) => Err(parse_io_error(err)), + Err(err) => Err(new_std_io_error(err)), } } @@ -485,7 +486,7 @@ impl Accessor for FsBackend { return if e.kind() == std::io::ErrorKind::NotFound { Ok((RpList::default(), None)) } else { - Err(parse_io_error(e)) + Err(new_std_io_error(e)) }; } }; @@ -498,7 +499,7 @@ impl Accessor for FsBackend { fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> { let p = self.root.join(path.trim_end_matches('/')); - std::fs::create_dir_all(p).map_err(parse_io_error)?; + std::fs::create_dir_all(p).map_err(new_std_io_error)?; Ok(RpCreateDir::default()) } @@ -511,11 +512,11 @@ impl Accessor for FsBackend { let mut f = std::fs::OpenOptions::new() .read(true) .open(p) - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; if self.enable_path_check { // Get fs metadata of file at given path, ensuring it is not a false-positive due to slash normalization. - let meta = f.metadata().map_err(parse_io_error)?; + let meta = f.metadata().map_err(new_std_io_error)?; if meta.is_dir() != path.ends_with('/') { return Err(Error::new( ErrorKind::NotFound, @@ -533,15 +534,17 @@ impl Accessor for FsBackend { let (start, end) = match (args.range().offset(), args.range().size()) { (None, None) => (0, None), (None, Some(size)) => { - let start = f.seek(SeekFrom::End(size as i64)).map_err(parse_io_error)?; + let start = f + .seek(SeekFrom::End(size as i64)) + .map_err(new_std_io_error)?; (start, Some(start + size)) } (Some(offset), None) => { - let start = f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?; + let start = f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?; (start, None) } (Some(offset), Some(size)) => { - let start = f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?; + let start = f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?; (start, Some(size)) } }; @@ -561,7 +564,7 @@ impl Accessor for FsBackend { if op.append() && Path::new(&target_path) .try_exists() - .map_err(parse_io_error)? + .map_err(new_std_io_error)? { (target_path, None) } else { @@ -584,7 +587,7 @@ impl Accessor for FsBackend { let f = f .open(tmp_path.as_ref().unwrap_or(&target_path)) - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; Ok((RpWrite::new(), FsWriter::new(target_path, tmp_path, f))) } @@ -593,11 +596,11 @@ impl Accessor for FsBackend { let from = self.root.join(from.trim_end_matches('/')); // try to get the metadata of the source file to ensure it exists - std::fs::metadata(&from).map_err(parse_io_error)?; + std::fs::metadata(&from).map_err(new_std_io_error)?; let to = Self::blocking_ensure_write_abs_path(&self.root, to.trim_end_matches('/'))?; - std::fs::copy(from, to).map_err(parse_io_error)?; + std::fs::copy(from, to).map_err(new_std_io_error)?; Ok(RpCopy::default()) } @@ -606,11 +609,11 @@ impl Accessor for FsBackend { let from = self.root.join(from.trim_end_matches('/')); // try to get the metadata of the source file to ensure it exists - std::fs::metadata(&from).map_err(parse_io_error)?; + std::fs::metadata(&from).map_err(new_std_io_error)?; let to = Self::blocking_ensure_write_abs_path(&self.root, to.trim_end_matches('/'))?; - std::fs::rename(from, to).map_err(parse_io_error)?; + std::fs::rename(from, to).map_err(new_std_io_error)?; Ok(RpRename::default()) } @@ -618,7 +621,7 @@ impl Accessor for FsBackend { fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> { let p = self.root.join(path.trim_end_matches('/')); - let meta = std::fs::metadata(p).map_err(parse_io_error)?; + let meta = std::fs::metadata(p).map_err(new_std_io_error)?; if self.enable_path_check && meta.is_dir() != path.ends_with('/') { return Err(Error::new( @@ -639,7 +642,7 @@ impl Accessor for FsBackend { .with_last_modified( meta.modified() .map(DateTime::from) - .map_err(parse_io_error)?, + .map_err(new_std_io_error)?, ); Ok(RpStat::new(m)) @@ -653,15 +656,15 @@ impl Accessor for FsBackend { match meta { Ok(meta) => { if meta.is_dir() { - std::fs::remove_dir(&p).map_err(parse_io_error)?; + std::fs::remove_dir(&p).map_err(new_std_io_error)?; } else { - std::fs::remove_file(&p).map_err(parse_io_error)?; + std::fs::remove_file(&p).map_err(new_std_io_error)?; } Ok(RpDelete::default()) } Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(RpDelete::default()), - Err(err) => Err(parse_io_error(err)), + Err(err) => Err(new_std_io_error(err)), } } @@ -674,7 +677,7 @@ impl Accessor for FsBackend { return if e.kind() == std::io::ErrorKind::NotFound { Ok((RpList::default(), None)) } else { - Err(parse_io_error(e)) + Err(new_std_io_error(e)) }; } }; diff --git a/core/src/services/fs/error.rs b/core/src/services/fs/error.rs deleted file mode 100644 index de9f710ce..000000000 --- a/core/src/services/fs/error.rs +++ /dev/null @@ -1,41 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::io; - -use crate::Error; -use crate::ErrorKind; - -/// Parse all io related errors. -pub fn parse_io_error(err: io::Error) -> Error { - use io::ErrorKind::*; - - let (kind, retryable) = match err.kind() { - NotFound => (ErrorKind::NotFound, false), - PermissionDenied => (ErrorKind::PermissionDenied, false), - Interrupted | UnexpectedEof | TimedOut | WouldBlock => (ErrorKind::Unexpected, true), - _ => (ErrorKind::Unexpected, true), - }; - - let mut err = Error::new(kind, &err.kind().to_string()).set_source(err); - - if retryable { - err = err.set_temporary(); - } - - err -} diff --git a/core/src/services/fs/mod.rs b/core/src/services/fs/mod.rs index aa2a5fca1..28aae0814 100644 --- a/core/src/services/fs/mod.rs +++ b/core/src/services/fs/mod.rs @@ -18,6 +18,5 @@ mod backend; pub use backend::FsBuilder as Fs; -mod error; mod pager; mod writer; diff --git a/core/src/services/fs/pager.rs b/core/src/services/fs/pager.rs index f15ac8eaa..1c1e1fcce 100644 --- a/core/src/services/fs/pager.rs +++ b/core/src/services/fs/pager.rs @@ -20,7 +20,6 @@ use std::path::PathBuf; use async_trait::async_trait; -use super::error::parse_io_error; use crate::raw::*; use crate::EntryMode; use crate::Metadata; @@ -49,7 +48,7 @@ impl oio::Page for FsPager<tokio::fs::ReadDir> { let mut oes: Vec<oio::Entry> = Vec::with_capacity(self.size); for _ in 0..self.size { - let de = match self.rd.next_entry().await.map_err(parse_io_error)? { + let de = match self.rd.next_entry().await.map_err(new_std_io_error)? { Some(de) => de, None => break, }; @@ -67,7 +66,7 @@ impl oio::Page for FsPager<tokio::fs::ReadDir> { // (no extra system calls needed), but some Unix platforms may // require the equivalent call to symlink_metadata to learn about // the target file type. - let file_type = de.file_type().await.map_err(parse_io_error)?; + let file_type = de.file_type().await.map_err(new_std_io_error)?; let d = if file_type.is_file() { oio::Entry::new(&rel_path, Metadata::new(EntryMode::FILE)) @@ -91,7 +90,7 @@ impl oio::BlockingPage for FsPager<std::fs::ReadDir> { for _ in 0..self.size { let de = match self.rd.next() { - Some(de) => de.map_err(parse_io_error)?, + Some(de) => de.map_err(new_std_io_error)?, None => break, }; @@ -108,7 +107,7 @@ impl oio::BlockingPage for FsPager<std::fs::ReadDir> { // (no extra system calls needed), but some Unix platforms may // require the equivalent call to symlink_metadata to learn about // the target file type. - let file_type = de.file_type().map_err(parse_io_error)?; + let file_type = de.file_type().map_err(new_std_io_error)?; let d = if file_type.is_file() { oio::Entry::new(&rel_path, Metadata::new(EntryMode::FILE)) diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index d1283d4ca..bd41f5611 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -28,7 +28,6 @@ use futures::FutureExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; -use super::error::parse_io_error; use crate::raw::*; use crate::*; @@ -64,7 +63,7 @@ impl oio::Write for FsWriter<tokio::fs::File> { Pin::new(f) .poll_write_vectored(cx, &bs.vectored_chunk()) - .map_err(parse_io_error) + .map_err(new_std_io_error) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { @@ -79,13 +78,13 @@ impl oio::Write for FsWriter<tokio::fs::File> { let tmp_path = self.tmp_path.clone(); let target_path = self.target_path.clone(); self.fut = Some(Box::pin(async move { - f.flush().await.map_err(parse_io_error)?; - f.sync_all().await.map_err(parse_io_error)?; + f.flush().await.map_err(new_std_io_error)?; + f.sync_all().await.map_err(new_std_io_error)?; if let Some(tmp_path) = &tmp_path { tokio::fs::rename(tmp_path, &target_path) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; } Ok(()) @@ -107,7 +106,7 @@ impl oio::Write for FsWriter<tokio::fs::File> { if let Some(tmp_path) = &tmp_path { tokio::fs::remove_file(tmp_path) .await - .map_err(parse_io_error) + .map_err(new_std_io_error) } else { Err(Error::new( ErrorKind::Unsupported, @@ -124,15 +123,15 @@ impl oio::BlockingWrite for FsWriter<std::fs::File> { let f = self.f.as_mut().expect("FsWriter must be initialized"); f.write_vectored(&bs.vectored_chunk()) - .map_err(parse_io_error) + .map_err(new_std_io_error) } fn close(&mut self) -> Result<()> { if let Some(f) = self.f.take() { - f.sync_all().map_err(parse_io_error)?; + f.sync_all().map_err(new_std_io_error)?; if let Some(tmp_path) = &self.tmp_path { - std::fs::rename(tmp_path, &self.target_path).map_err(parse_io_error)?; + std::fs::rename(tmp_path, &self.target_path).map_err(new_std_io_error)?; } } diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index 4d093656b..a7fb23f0c 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -25,7 +25,6 @@ use std::sync::Arc; use async_trait::async_trait; use log::debug; -use super::error::parse_io_error; use super::pager::HdfsPager; use super::writer::HdfsWriter; use crate::raw::*; @@ -127,14 +126,14 @@ impl Builder for HdfsBuilder { builder = builder.with_user(user.as_str()); } - let client = builder.connect().map_err(parse_io_error)?; + let client = builder.connect().map_err(new_std_io_error)?; // Create root dir if not exist. if let Err(e) = client.metadata(&root) { if e.kind() == io::ErrorKind::NotFound { debug!("root {} is not exist, creating now", root); - client.create_dir(&root).map_err(parse_io_error)? + client.create_dir(&root).map_err(new_std_io_error)? } } @@ -198,7 +197,7 @@ impl Accessor for HdfsBackend { async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> { let p = build_rooted_abs_path(&self.root, path); - self.client.create_dir(&p).map_err(parse_io_error)?; + self.client.create_dir(&p).map_err(new_std_io_error)?; Ok(RpCreateDir::default()) } @@ -214,7 +213,7 @@ impl Accessor for HdfsBackend { .read(true) .async_open(&p) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; let (start, end) = match (args.range().offset(), args.range().size()) { (None, None) => (0, None), @@ -222,21 +221,21 @@ impl Accessor for HdfsBackend { let start = f .seek(SeekFrom::End(size as i64)) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; (start, Some(start + size)) } (Some(offset), None) => { let start = f .seek(SeekFrom::Start(offset)) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; (start, None) } (Some(offset), Some(size)) => { let start = f .seek(SeekFrom::Start(offset)) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; (start, Some(size)) } }; @@ -262,7 +261,7 @@ impl Accessor for HdfsBackend { self.client .create_dir(&parent.to_string_lossy()) - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; let mut open_options = self.client.open_file(); open_options.create(true); @@ -272,7 +271,10 @@ impl Accessor for HdfsBackend { open_options.write(true); } - let f = open_options.async_open(&p).await.map_err(parse_io_error)?; + let f = open_options + .async_open(&p) + .await + .map_err(new_std_io_error)?; Ok((RpWrite::new(), HdfsWriter::new(f))) } @@ -280,7 +282,7 @@ impl Accessor for HdfsBackend { async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { let p = build_rooted_abs_path(&self.root, path); - let meta = self.client.metadata(&p).map_err(parse_io_error)?; + let meta = self.client.metadata(&p).map_err(new_std_io_error)?; let mode = if meta.is_dir() { EntryMode::DIR @@ -305,7 +307,7 @@ impl Accessor for HdfsBackend { return if err.kind() == io::ErrorKind::NotFound { Ok(RpDelete::default()) } else { - Err(parse_io_error(err)) + Err(new_std_io_error(err)) }; } @@ -318,7 +320,7 @@ impl Accessor for HdfsBackend { self.client.remove_file(&p) }; - result.map_err(parse_io_error)?; + result.map_err(new_std_io_error)?; Ok(RpDelete::default()) } @@ -332,7 +334,7 @@ impl Accessor for HdfsBackend { return if e.kind() == io::ErrorKind::NotFound { Ok((RpList::default(), None)) } else { - Err(parse_io_error(e)) + Err(new_std_io_error(e)) } } }; @@ -345,7 +347,7 @@ impl Accessor for HdfsBackend { fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> { let p = build_rooted_abs_path(&self.root, path); - self.client.create_dir(&p).map_err(parse_io_error)?; + self.client.create_dir(&p).map_err(new_std_io_error)?; Ok(RpCreateDir::default()) } @@ -360,20 +362,22 @@ impl Accessor for HdfsBackend { .open_file() .read(true) .open(&p) - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; let (start, end) = match (args.range().offset(), args.range().size()) { (None, None) => (0, None), (None, Some(size)) => { - let start = f.seek(SeekFrom::End(size as i64)).map_err(parse_io_error)?; + let start = f + .seek(SeekFrom::End(size as i64)) + .map_err(new_std_io_error)?; (start, Some(start + size)) } (Some(offset), None) => { - let start = f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?; + let start = f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?; (start, None) } (Some(offset), Some(size)) => { - let start = f.seek(SeekFrom::Start(offset)).map_err(parse_io_error)?; + let start = f.seek(SeekFrom::Start(offset)).map_err(new_std_io_error)?; (start, Some(size)) } }; @@ -399,7 +403,7 @@ impl Accessor for HdfsBackend { self.client .create_dir(&parent.to_string_lossy()) - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; let f = self .client @@ -407,7 +411,7 @@ impl Accessor for HdfsBackend { .create(true) .write(true) .open(&p) - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; Ok((RpWrite::new(), HdfsWriter::new(f))) } @@ -415,7 +419,7 @@ impl Accessor for HdfsBackend { fn blocking_stat(&self, path: &str, _: OpStat) -> Result<RpStat> { let p = build_rooted_abs_path(&self.root, path); - let meta = self.client.metadata(&p).map_err(parse_io_error)?; + let meta = self.client.metadata(&p).map_err(new_std_io_error)?; let mode = if meta.is_dir() { EntryMode::DIR @@ -440,7 +444,7 @@ impl Accessor for HdfsBackend { return if err.kind() == io::ErrorKind::NotFound { Ok(RpDelete::default()) } else { - Err(parse_io_error(err)) + Err(new_std_io_error(err)) }; } @@ -453,7 +457,7 @@ impl Accessor for HdfsBackend { self.client.remove_file(&p) }; - result.map_err(parse_io_error)?; + result.map_err(new_std_io_error)?; Ok(RpDelete::default()) } @@ -467,7 +471,7 @@ impl Accessor for HdfsBackend { return if e.kind() == io::ErrorKind::NotFound { Ok((RpList::default(), None)) } else { - Err(parse_io_error(e)) + Err(new_std_io_error(e)) } } }; diff --git a/core/src/services/hdfs/error.rs b/core/src/services/hdfs/error.rs deleted file mode 100644 index f97fada23..000000000 --- a/core/src/services/hdfs/error.rs +++ /dev/null @@ -1,45 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::io; - -use crate::Error; -use crate::ErrorKind; - -/// Parse all path related errors. -/// -/// ## Notes -/// -/// Skip utf-8 check to allow invalid path input. -pub fn parse_io_error(err: io::Error) -> Error { - use io::ErrorKind::*; - - let (kind, retryable) = match err.kind() { - NotFound => (ErrorKind::NotFound, false), - PermissionDenied => (ErrorKind::PermissionDenied, false), - Interrupted | UnexpectedEof | TimedOut | WouldBlock => (ErrorKind::Unexpected, true), - _ => (ErrorKind::Unexpected, true), - }; - - let mut err = Error::new(kind, &err.kind().to_string()).set_source(err); - - if retryable { - err = err.set_temporary(); - } - - err -} diff --git a/core/src/services/hdfs/mod.rs b/core/src/services/hdfs/mod.rs index c1e98a3b6..996a654fa 100644 --- a/core/src/services/hdfs/mod.rs +++ b/core/src/services/hdfs/mod.rs @@ -18,6 +18,5 @@ mod backend; pub use backend::HdfsBuilder as Hdfs; -mod error; mod pager; mod writer; diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index a436a8416..4990df40a 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -23,7 +23,6 @@ use std::task::Poll; use async_trait::async_trait; use futures::AsyncWrite; -use super::error::parse_io_error; use crate::raw::*; use crate::*; @@ -42,7 +41,7 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { Pin::new(&mut self.f) .poll_write(cx, bs.chunk()) - .map_err(parse_io_error) + .map_err(new_std_io_error) } fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { @@ -53,17 +52,19 @@ impl oio::Write for HdfsWriter<hdrs::AsyncFile> { } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - Pin::new(&mut self.f).poll_close(cx).map_err(parse_io_error) + Pin::new(&mut self.f) + .poll_close(cx) + .map_err(new_std_io_error) } } impl oio::BlockingWrite for HdfsWriter<hdrs::File> { fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result<usize> { - self.f.write(bs.chunk()).map_err(parse_io_error) + self.f.write(bs.chunk()).map_err(new_std_io_error) } fn close(&mut self) -> Result<()> { - self.f.flush().map_err(parse_io_error)?; + self.f.flush().map_err(new_std_io_error)?; Ok(()) } diff --git a/core/src/services/memcached/ascii.rs b/core/src/services/memcached/ascii.rs index 12ba7589d..6a7908893 100644 --- a/core/src/services/memcached/ascii.rs +++ b/core/src/services/memcached/ascii.rs @@ -21,7 +21,7 @@ use tokio::io::AsyncWriteExt; use tokio::io::BufReader; use tokio::net::TcpStream; -use super::backend::parse_io_error; +use crate::raw::*; use crate::*; pub struct Connection { @@ -43,8 +43,8 @@ impl Connection { writer .write_all(&[b"get ", key.as_bytes(), b"\r\n"].concat()) .await - .map_err(parse_io_error)?; - writer.flush().await.map_err(parse_io_error)?; + .map_err(new_std_io_error)?; + writer.flush().await.map_err(new_std_io_error)?; // Read response header let header = self.read_header().await?; @@ -71,7 +71,7 @@ impl Connection { self.io .read_exact(&mut buffer) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; // Read the trailing header self.read_line().await?; // \r\n @@ -85,10 +85,10 @@ impl Connection { self.io .write_all(header.as_bytes()) .await - .map_err(parse_io_error)?; - self.io.write_all(val).await.map_err(parse_io_error)?; - self.io.write_all(b"\r\n").await.map_err(parse_io_error)?; - self.io.flush().await.map_err(parse_io_error)?; + .map_err(new_std_io_error)?; + self.io.write_all(val).await.map_err(new_std_io_error)?; + self.io.write_all(b"\r\n").await.map_err(new_std_io_error)?; + self.io.flush().await.map_err(new_std_io_error)?; // Read response header let header = self.read_header().await?; @@ -110,8 +110,8 @@ impl Connection { self.io .write_all(header.as_bytes()) .await - .map_err(parse_io_error)?; - self.io.flush().await.map_err(parse_io_error)?; + .map_err(new_std_io_error)?; + self.io.flush().await.map_err(new_std_io_error)?; // Read response header let header = self.read_header().await?; @@ -132,8 +132,8 @@ impl Connection { self.io .write_all(b"version\r\n") .await - .map_err(parse_io_error)?; - self.io.flush().await.map_err(parse_io_error)?; + .map_err(new_std_io_error)?; + self.io.flush().await.map_err(new_std_io_error)?; // Read response header let header = self.read_header().await?; @@ -151,7 +151,7 @@ impl Connection { async fn read_line(&mut self) -> Result<&[u8]> { let Self { io, buf } = self; buf.clear(); - io.read_until(b'\n', buf).await.map_err(parse_io_error)?; + io.read_until(b'\n', buf).await.map_err(new_std_io_error)?; if buf.last().copied() != Some(b'\n') { return Err(Error::new( ErrorKind::ContentIncomplete, diff --git a/core/src/services/memcached/backend.rs b/core/src/services/memcached/backend.rs index 2731a70a1..91c127de8 100644 --- a/core/src/services/memcached/backend.rs +++ b/core/src/services/memcached/backend.rs @@ -250,7 +250,7 @@ impl bb8::ManageConnection for MemcacheConnectionManager { async fn connect(&self) -> std::result::Result<Self::Connection, Self::Error> { let conn = TcpStream::connect(&self.address) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; Ok(ascii::Connection::new(conn)) } @@ -262,7 +262,3 @@ impl bb8::ManageConnection for MemcacheConnectionManager { false } } - -pub fn parse_io_error(err: std::io::Error) -> Error { - Error::new(ErrorKind::Unexpected, &err.kind().to_string()).set_source(err) -} diff --git a/core/src/services/sftp/backend.rs b/core/src/services/sftp/backend.rs index 774221534..258c77066 100644 --- a/core/src/services/sftp/backend.rs +++ b/core/src/services/sftp/backend.rs @@ -303,21 +303,21 @@ impl Accessor for SftpBackend { let start = f .seek(SeekFrom::End(size as i64)) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; (start, Some(start + size)) } (Some(offset), None) => { let start = f .seek(SeekFrom::Start(offset)) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; (start, None) } (Some(offset), Some(size)) => { let start = f .seek(SeekFrom::Start(offset)) .await - .map_err(parse_io_error)?; + .map_err(new_std_io_error)?; (start, Some(size)) } }; @@ -558,23 +558,3 @@ async fn connect_sftp( Ok(sftp) } - -/// Parse all io related errors. -pub fn parse_io_error(err: std::io::Error) -> Error { - use std::io::ErrorKind::*; - - let (kind, retryable) = match err.kind() { - NotFound => (ErrorKind::NotFound, false), - PermissionDenied => (ErrorKind::PermissionDenied, false), - Interrupted | UnexpectedEof | TimedOut | WouldBlock => (ErrorKind::Unexpected, true), - _ => (ErrorKind::Unexpected, true), - }; - - let mut err = Error::new(kind, &err.kind().to_string()).set_source(err); - - if retryable { - err = err.set_temporary(); - } - - err -} diff --git a/core/src/services/sftp/error.rs b/core/src/services/sftp/error.rs index 9c0124e77..adb8d079e 100644 --- a/core/src/services/sftp/error.rs +++ b/core/src/services/sftp/error.rs @@ -45,6 +45,7 @@ impl From<SftpClientError> for Error { } } +/// REMOVE ME: it's not allowed to impl <T> for Error. impl From<SshError> for Error { fn from(e: SshError) -> Self { Error::new(ErrorKind::Unexpected, "ssh error").set_source(e) diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index c6a2aa6be..c80d8de9b 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -45,11 +45,14 @@ impl oio::Write for SftpWriter { self.file .as_mut() .poll_write(cx, bs.chunk()) - .map_err(parse_io_error) + .map_err(new_std_io_error) } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - self.file.as_mut().poll_shutdown(cx).map_err(parse_io_error) + self.file + .as_mut() + .poll_shutdown(cx) + .map_err(new_std_io_error) } fn poll_abort(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> { @@ -60,6 +63,6 @@ impl oio::Write for SftpWriter { } } -fn parse_io_error(err: std::io::Error) -> Error { +fn new_std_io_error(err: std::io::Error) -> Error { Error::new(ErrorKind::Unexpected, "read from sftp").set_source(err) }
