This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch refactor-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 51af9fd076cb60ee69af19fdcba86ea66be545e1 Author: Xuanwo <[email protected]> AuthorDate: Thu Sep 7 23:21:02 2023 +0800 Polish write Signed-off-by: Xuanwo <[email protected]> --- core/benches/oio/utils.rs | 4 ++-- core/benches/oio/write.rs | 4 ++-- core/src/raw/adapters/kv/backend.rs | 1 - core/src/raw/adapters/typed_kv/backend.rs | 1 - core/src/raw/oio/write/api.rs | 2 -- core/src/raw/oio/write/append_object_write.rs | 1 - core/src/raw/oio/write/compose_write.rs | 1 - core/src/raw/oio/write/multipart_upload_write.rs | 1 - core/src/raw/oio/write/one_shot_write.rs | 1 - core/src/services/azblob/writer.rs | 1 - core/src/services/azdfs/writer.rs | 1 - core/src/services/dropbox/writer.rs | 1 - core/src/services/fs/writer.rs | 10 ++-------- core/src/services/ftp/writer.rs | 1 - core/src/services/ghac/writer.rs | 1 - core/src/services/hdfs/writer.rs | 7 +------ core/src/services/ipmfs/writer.rs | 1 - core/src/services/sftp/writer.rs | 1 - core/src/services/vercel_artifacts/writer.rs | 1 - core/src/services/wasabi/writer.rs | 1 - core/src/services/webdav/writer.rs | 1 - core/src/services/webhdfs/writer.rs | 1 - core/src/types/writer.rs | 8 ++++++-- core/tests/behavior/write.rs | 6 +++--- 24 files changed, 16 insertions(+), 42 deletions(-) diff --git a/core/benches/oio/utils.rs b/core/benches/oio/utils.rs index bde28481e..92aa56450 100644 --- a/core/benches/oio/utils.rs +++ b/core/benches/oio/utils.rs @@ -26,8 +26,8 @@ pub struct BlackHoleWriter; #[async_trait] impl oio::Write for BlackHoleWriter { - async fn write(&mut self, bs: Bytes) -> opendal::Result<u64> { - Ok(bs.len() as u64) + async fn write(&mut self, bs: &dyn opendal::Buf) -> opendal::Result<usize> { + Ok(bs.remaining()) } async fn abort(&mut self) -> opendal::Result<()> { diff --git a/core/benches/oio/write.rs b/core/benches/oio/write.rs index a227d4901..8da2a6dd0 100644 --- a/core/benches/oio/write.rs +++ b/core/benches/oio/write.rs @@ -48,8 +48,8 @@ pub fn bench_exact_buf_write(c: &mut Criterion) { let mut bs = content.clone(); while !bs.is_empty() { - let n = w.write(bs.clone()).await.unwrap(); - bs.advance(n as usize); + let n = w.write(&bs).await.unwrap(); + bs.advance(n); } w.close().await.unwrap(); }) diff --git a/core/src/raw/adapters/kv/backend.rs b/core/src/raw/adapters/kv/backend.rs index 7af41dced..53b37a119 100644 --- a/core/src/raw/adapters/kv/backend.rs +++ b/core/src/raw/adapters/kv/backend.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use super::Adapter; use crate::raw::*; diff --git a/core/src/raw/adapters/typed_kv/backend.rs b/core/src/raw/adapters/typed_kv/backend.rs index 6ed086ba8..14879d436 100644 --- a/core/src/raw/adapters/typed_kv/backend.rs +++ b/core/src/raw/adapters/typed_kv/backend.rs @@ -22,7 +22,6 @@ use bytes::Bytes; use super::Adapter; use super::Value; -use crate::raw::oio::VectorCursor; use crate::raw::*; use crate::*; diff --git a/core/src/raw/oio/write/api.rs b/core/src/raw/oio/write/api.rs index 606f7e9bc..66af2de08 100644 --- a/core/src/raw/oio/write/api.rs +++ b/core/src/raw/oio/write/api.rs @@ -19,9 +19,7 @@ use std::fmt::Display; use std::fmt::Formatter; use async_trait::async_trait; -use bytes::Bytes; -use crate::raw::oio; use crate::*; /// WriteOperation is the name for APIs of Writer. diff --git a/core/src/raw/oio/write/append_object_write.rs b/core/src/raw/oio/write/append_object_write.rs index 7f9e8c043..cbf5ef194 100644 --- a/core/src/raw/oio/write/append_object_write.rs +++ b/core/src/raw/oio/write/append_object_write.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use crate::raw::*; use crate::*; diff --git a/core/src/raw/oio/write/compose_write.rs b/core/src/raw/oio/write/compose_write.rs index 44c2289bf..991263e98 100644 --- a/core/src/raw/oio/write/compose_write.rs +++ b/core/src/raw/oio/write/compose_write.rs @@ -39,7 +39,6 @@ //! type_alias_impl_trait has been stabilized. use async_trait::async_trait; -use bytes::Bytes; use crate::raw::*; use crate::*; diff --git a/core/src/raw/oio/write/multipart_upload_write.rs b/core/src/raw/oio/write/multipart_upload_write.rs index e5a9aedba..0ae01c157 100644 --- a/core/src/raw/oio/write/multipart_upload_write.rs +++ b/core/src/raw/oio/write/multipart_upload_write.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use crate::raw::*; use crate::*; diff --git a/core/src/raw/oio/write/one_shot_write.rs b/core/src/raw/oio/write/one_shot_write.rs index 5845784d1..857783a51 100644 --- a/core/src/raw/oio/write/one_shot_write.rs +++ b/core/src/raw/oio/write/one_shot_write.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use crate::raw::*; use crate::*; diff --git a/core/src/services/azblob/writer.rs b/core/src/services/azblob/writer.rs index 423449014..b0db2f5bf 100644 --- a/core/src/services/azblob/writer.rs +++ b/core/src/services/azblob/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::AzblobCore; diff --git a/core/src/services/azdfs/writer.rs b/core/src/services/azdfs/writer.rs index 7d1e22c65..d79bf3ea5 100644 --- a/core/src/services/azdfs/writer.rs +++ b/core/src/services/azdfs/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::AzdfsCore; diff --git a/core/src/services/dropbox/writer.rs b/core/src/services/dropbox/writer.rs index 2c51efc41..7c7726937 100644 --- a/core/src/services/dropbox/writer.rs +++ b/core/src/services/dropbox/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::DropboxCore; diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs index b317bf6ab..74285adec 100644 --- a/core/src/services/fs/writer.rs +++ b/core/src/services/fs/writer.rs @@ -15,14 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::io::Seek; -use std::io::SeekFrom; use std::io::Write; use std::path::PathBuf; use async_trait::async_trait; -use bytes::Bytes; -use tokio::io::AsyncSeekExt; use tokio::io::AsyncWriteExt; use super::error::parse_io_error; @@ -33,7 +29,6 @@ pub struct FsWriter<F> { target_path: PathBuf, tmp_path: Option<PathBuf>, f: F, - pos: u64, } impl<F> FsWriter<F> { @@ -42,7 +37,6 @@ impl<F> FsWriter<F> { target_path, tmp_path, f, - pos: 0, } } } @@ -50,7 +44,7 @@ impl<F> FsWriter<F> { #[async_trait] impl oio::Write for FsWriter<tokio::fs::File> { async fn write(&mut self, bs: &dyn Buf) -> Result<usize> { - self.f.write(&bs.chunk()).await.map_err(parse_io_error) + self.f.write(bs.chunk()).await.map_err(parse_io_error) } async fn abort(&mut self) -> Result<()> { @@ -75,7 +69,7 @@ impl oio::Write for FsWriter<tokio::fs::File> { impl oio::BlockingWrite for FsWriter<std::fs::File> { fn write(&mut self, bs: &dyn Buf) -> Result<usize> { - self.f.write(&bs.chunk()).map_err(parse_io_error) + self.f.write(bs.chunk()).map_err(parse_io_error) } fn close(&mut self) -> Result<()> { diff --git a/core/src/services/ftp/writer.rs b/core/src/services/ftp/writer.rs index d77703c57..7816428cc 100644 --- a/core/src/services/ftp/writer.rs +++ b/core/src/services/ftp/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use futures::AsyncWriteExt; use super::backend::FtpBackend; diff --git a/core/src/services/ghac/writer.rs b/core/src/services/ghac/writer.rs index faad830c2..2c000dba9 100644 --- a/core/src/services/ghac/writer.rs +++ b/core/src/services/ghac/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use super::backend::GhacBackend; use super::error::parse_error; diff --git a/core/src/services/hdfs/writer.rs b/core/src/services/hdfs/writer.rs index 12e4eb557..7a0b07e3c 100644 --- a/core/src/services/hdfs/writer.rs +++ b/core/src/services/hdfs/writer.rs @@ -18,7 +18,6 @@ use std::io::Write; use async_trait::async_trait; -use bytes::Bytes; use futures::AsyncWriteExt; use super::error::parse_io_error; @@ -27,15 +26,11 @@ use crate::*; pub struct HdfsWriter<F> { f: F, - /// The position of current written bytes in the buffer. - /// - /// We will maintain the posstion in pos to make sure the buffer is written correctly. - pos: usize, } impl<F> HdfsWriter<F> { pub fn new(f: F) -> Self { - Self { f, pos: 0 } + Self { f } } } diff --git a/core/src/services/ipmfs/writer.rs b/core/src/services/ipmfs/writer.rs index 7e5199b2a..61c789f1a 100644 --- a/core/src/services/ipmfs/writer.rs +++ b/core/src/services/ipmfs/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::IpmfsBackend; diff --git a/core/src/services/sftp/writer.rs b/core/src/services/sftp/writer.rs index 5ddbd5ea0..eeb887624 100644 --- a/core/src/services/sftp/writer.rs +++ b/core/src/services/sftp/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use openssh_sftp_client::file::File; use crate::raw::oio; diff --git a/core/src/services/vercel_artifacts/writer.rs b/core/src/services/vercel_artifacts/writer.rs index a6d9b24b9..5db1d1a14 100644 --- a/core/src/services/vercel_artifacts/writer.rs +++ b/core/src/services/vercel_artifacts/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::VercelArtifactsBackend; diff --git a/core/src/services/wasabi/writer.rs b/core/src/services/wasabi/writer.rs index b2825291b..6af3d1a23 100644 --- a/core/src/services/wasabi/writer.rs +++ b/core/src/services/wasabi/writer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::core::*; diff --git a/core/src/services/webdav/writer.rs b/core/src/services/webdav/writer.rs index ad9744ea0..ef875a46e 100644 --- a/core/src/services/webdav/writer.rs +++ b/core/src/services/webdav/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::WebdavBackend; diff --git a/core/src/services/webhdfs/writer.rs b/core/src/services/webhdfs/writer.rs index 6357a9e3a..399914e99 100644 --- a/core/src/services/webhdfs/writer.rs +++ b/core/src/services/webhdfs/writer.rs @@ -16,7 +16,6 @@ // under the License. use async_trait::async_trait; -use bytes::Bytes; use http::StatusCode; use super::backend::WebhdfsBackend; diff --git a/core/src/types/writer.rs b/core/src/types/writer.rs index e78c83212..f7f84e465 100644 --- a/core/src/types/writer.rs +++ b/core/src/types/writer.rs @@ -260,8 +260,10 @@ impl AsyncWrite for Writer { let mut w = w .take() .expect("invalid state of writer: Idle state with empty write"); + // FIXME: This will the buf everytime, we should avoid this. + let bs = Bytes::copy_from_slice(buf); let fut = async move { - let n = w.write(&buf).await?; + let n = w.write(&bs).await?; Ok((n, w)) }; self.state = State::Write(Box::pin(fut)); @@ -331,8 +333,10 @@ impl tokio::io::AsyncWrite for Writer { let mut w = w .take() .expect("invalid state of writer: Idle state with empty write"); + // FIXME: This will the buf everytime, we should avoid this. + let bs = Bytes::copy_from_slice(buf); let fut = async move { - let n = w.write(&buf).await?; + let n = w.write(&bs).await?; Ok((n as usize, w)) }; self.state = State::Write(Box::pin(fut)); diff --git a/core/tests/behavior/write.rs b/core/tests/behavior/write.rs index 10c9e8c1a..f37f4a767 100644 --- a/core/tests/behavior/write.rs +++ b/core/tests/behavior/write.rs @@ -994,7 +994,7 @@ pub async fn test_writer_abort(op: Operator) -> Result<()> { } }; - if let Err(e) = writer.write(content).await { + if let Err(e) = writer.write(content.as_slice()).await { assert_eq!(e.kind(), ErrorKind::Unsupported); return Ok(()); } @@ -1120,8 +1120,8 @@ pub async fn test_writer_write(op: Operator) -> Result<()> { let content_b = gen_fixed_bytes(size); let mut w = op.writer(&path).await?; - w.write(content_a.clone()).await?; - w.write(content_b.clone()).await?; + w.write(content_a.as_slice()).await?; + w.write(content_b.as_slice()).await?; w.close().await?; let meta = op.stat(&path).await.expect("stat must succeed");
