This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 8eeec4488 refactor(core)!: Remove the range writer that has never been 
used (#5323)
8eeec4488 is described below

commit 8eeec448831f71e4c52127c52362561afbe762a8
Author: Xuanwo <[email protected]>
AuthorDate: Thu Nov 14 21:26:47 2024 +0800

    refactor(core)!: Remove the range writer that has never been used (#5323)
---
 core/src/raw/oio/write/mod.rs         |   4 -
 core/src/raw/oio/write/range_write.rs | 365 ----------------------------------
 2 files changed, 369 deletions(-)

diff --git a/core/src/raw/oio/write/mod.rs b/core/src/raw/oio/write/mod.rs
index bd1ae5ae9..9ce7d9427 100644
--- a/core/src/raw/oio/write/mod.rs
+++ b/core/src/raw/oio/write/mod.rs
@@ -34,10 +34,6 @@ mod one_shot_write;
 pub use one_shot_write::OneShotWrite;
 pub use one_shot_write::OneShotWriter;
 
-mod range_write;
-pub use range_write::RangeWrite;
-pub use range_write::RangeWriter;
-
 mod block_write;
 pub use block_write::BlockWrite;
 pub use block_write::BlockWriter;
diff --git a/core/src/raw/oio/write/range_write.rs 
b/core/src/raw/oio/write/range_write.rs
deleted file mode 100644
index 905d850ba..000000000
--- a/core/src/raw/oio/write/range_write.rs
+++ /dev/null
@@ -1,365 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::sync::Arc;
-
-use futures::select;
-use futures::Future;
-use futures::FutureExt;
-
-use crate::raw::*;
-use crate::*;
-
-/// RangeWrite is used to implement [`oio::Write`] based on range write.
-///
-/// # Services
-///
-/// Services like gcs support range write via [GCS Resumable 
Upload](https://cloud.google.com/storage/docs/resumable-uploads).
-///
-/// GCS will support upload content by specifying the range of the file in 
`CONTENT-RANGE`.
-///
-/// Most range based services will have the following limitations:
-///
-/// - The size of chunk per upload must be aligned to a certain size. For 
example, GCS requires
-///   to align with 256KiB.
-/// - Some services requires to complete the write at the last chunk with the 
total size.
-///
-/// # Architecture
-///
-/// The architecture after adopting [`RangeWrite`]:
-///
-/// - Services impl `RangeWrite`
-/// - `RangeWriter` impl `Write`
-/// - Expose `RangeWriter` as `Accessor::Writer`
-///
-/// # Requirements
-///
-/// Services that implement `RangeWrite` must fulfill the following 
requirements:
-///
-/// - Must be a http service that could accept `AsyncBody`.
-/// - Need initialization before writing.
-/// - Writing data based on range: `offset`, `size`.
-pub trait RangeWrite: Send + Sync + Unpin + 'static {
-    /// write_once is used to write the data to underlying storage at once.
-    ///
-    /// RangeWriter will call this API when:
-    ///
-    /// - All the data has been written to the buffer and we can perform the 
upload at once.
-    fn write_once(&self, body: Buffer) -> impl Future<Output = Result<()>> + 
MaybeSend;
-
-    /// Initiate range the range write, the returning value is the location.
-    fn initiate_range(&self) -> impl Future<Output = Result<String>> + 
MaybeSend;
-
-    /// write_range will write a range of data.
-    fn write_range(
-        &self,
-        location: &str,
-        offset: u64,
-        body: Buffer,
-    ) -> impl Future<Output = Result<()>> + MaybeSend;
-
-    /// complete_range will complete the range write by uploading the last 
chunk.
-    fn complete_range(
-        &self,
-        location: &str,
-        offset: u64,
-        body: Buffer,
-    ) -> impl Future<Output = Result<()>> + MaybeSend;
-
-    /// abort_range will abort the range write by abort all already uploaded 
data.
-    fn abort_range(&self, location: &str) -> impl Future<Output = Result<()>> 
+ MaybeSend;
-}
-
-struct WriteInput<W: RangeWrite> {
-    w: Arc<W>,
-    executor: Executor,
-
-    location: Arc<String>,
-    offset: u64,
-    bytes: Buffer,
-}
-
-/// RangeWriter will implements [`oio::Write`] based on range write.
-pub struct RangeWriter<W: RangeWrite> {
-    w: Arc<W>,
-    executor: Executor,
-
-    location: Option<Arc<String>>,
-    next_offset: u64,
-    cache: Option<Buffer>,
-    tasks: ConcurrentTasks<WriteInput<W>, ()>,
-}
-
-impl<W: RangeWrite> RangeWriter<W> {
-    /// Create a new MultipartWriter.
-    pub fn new(inner: W, executor: Option<Executor>, concurrent: usize) -> 
Self {
-        let executor = executor.unwrap_or_default();
-
-        Self {
-            w: Arc::new(inner),
-            executor: executor.clone(),
-            location: None,
-            next_offset: 0,
-            cache: None,
-
-            tasks: ConcurrentTasks::new(executor, concurrent, |input| {
-                Box::pin(async move {
-                    let fut =
-                        input
-                            .w
-                            .write_range(&input.location, input.offset, 
input.bytes.clone());
-                    match input.executor.timeout() {
-                        None => {
-                            let result = fut.await;
-                            (input, result)
-                        }
-                        Some(timeout) => {
-                            let result = select! {
-                                result = fut.fuse() => {
-                                    result
-                                }
-                                _ = timeout.fuse() => {
-                                      Err(Error::new(
-                                            ErrorKind::Unexpected, "write 
range timeout")
-                                                .with_context("offset", 
input.offset.to_string())
-                                                .set_temporary())
-                                }
-                            };
-                            (input, result)
-                        }
-                    }
-                })
-            }),
-        }
-    }
-
-    fn fill_cache(&mut self, bs: Buffer) -> usize {
-        let size = bs.len();
-        assert!(self.cache.is_none());
-        self.cache = Some(bs);
-        size
-    }
-}
-
-impl<W: RangeWrite> oio::Write for RangeWriter<W> {
-    async fn write(&mut self, bs: Buffer) -> Result<()> {
-        let location = match self.location.clone() {
-            Some(location) => location,
-            None => {
-                // Fill cache with the first write.
-                if self.cache.is_none() {
-                    self.fill_cache(bs);
-                    return Ok(());
-                }
-
-                let location = self.w.initiate_range().await?;
-                let location = Arc::new(location);
-                self.location = Some(location.clone());
-                location
-            }
-        };
-
-        let bytes = self.cache.clone().expect("pending write must exist");
-        let length = bytes.len() as u64;
-        let offset = self.next_offset;
-
-        self.tasks
-            .execute(WriteInput {
-                w: self.w.clone(),
-                executor: self.executor.clone(),
-                location,
-                offset,
-                bytes,
-            })
-            .await?;
-        self.cache = None;
-        self.next_offset += length;
-        self.fill_cache(bs);
-        Ok(())
-    }
-
-    async fn close(&mut self) -> Result<()> {
-        let Some(location) = self.location.clone() else {
-            let body = self.cache.clone().unwrap_or_default();
-            // Call write_once if there is no data in buffer and no location.
-            self.w.write_once(body).await?;
-            self.cache = None;
-            return Ok(());
-        };
-
-        // Make sure all tasks are finished.
-        while self.tasks.next().await.transpose()?.is_some() {}
-
-        if let Some(buffer) = self.cache.clone() {
-            let offset = self.next_offset;
-            self.w.complete_range(&location, offset, buffer).await?;
-            self.cache = None;
-        }
-
-        Ok(())
-    }
-
-    async fn abort(&mut self) -> Result<()> {
-        let Some(location) = self.location.clone() else {
-            return Ok(());
-        };
-
-        self.tasks.clear();
-        self.cache = None;
-        self.w.abort_range(&location).await?;
-        Ok(())
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use std::collections::HashSet;
-    use std::sync::Mutex;
-    use std::time::Duration;
-
-    use pretty_assertions::assert_eq;
-    use rand::thread_rng;
-    use rand::Rng;
-    use rand::RngCore;
-    use tokio::time::sleep;
-
-    use super::*;
-    use crate::raw::oio::Write;
-
-    struct TestWrite {
-        length: u64,
-        bytes: HashSet<u64>,
-    }
-
-    impl TestWrite {
-        pub fn new() -> Arc<Mutex<Self>> {
-            let v = Self {
-                bytes: HashSet::new(),
-                length: 0,
-            };
-
-            Arc::new(Mutex::new(v))
-        }
-    }
-
-    impl RangeWrite for Arc<Mutex<TestWrite>> {
-        async fn write_once(&self, body: Buffer) -> Result<()> {
-            let mut test = self.lock().unwrap();
-            let size = body.len() as u64;
-            test.length += size;
-            test.bytes.extend(0..size);
-
-            Ok(())
-        }
-
-        async fn initiate_range(&self) -> Result<String> {
-            Ok("test".to_string())
-        }
-
-        async fn write_range(&self, _: &str, offset: u64, body: Buffer) -> 
Result<()> {
-            // Add an async sleep here to enforce some pending.
-            sleep(Duration::from_millis(50)).await;
-
-            // We will have 10% percent rate for write part to fail.
-            if thread_rng().gen_bool(1.0 / 10.0) {
-                return Err(
-                    Error::new(ErrorKind::Unexpected, "I'm a crazy 
monkey!").set_temporary()
-                );
-            }
-
-            let mut test = self.lock().unwrap();
-            let size = body.len() as u64;
-            test.length += size;
-
-            let input = (offset..offset + size).collect::<HashSet<_>>();
-
-            assert!(
-                test.bytes.is_disjoint(&input),
-                "input should not have overlap"
-            );
-            test.bytes.extend(input);
-
-            Ok(())
-        }
-
-        async fn complete_range(&self, _: &str, offset: u64, body: Buffer) -> 
Result<()> {
-            // Add an async sleep here to enforce some pending.
-            sleep(Duration::from_millis(50)).await;
-
-            // We will have 10% percent rate for write part to fail.
-            if thread_rng().gen_bool(1.0 / 10.0) {
-                return Err(
-                    Error::new(ErrorKind::Unexpected, "I'm a crazy 
monkey!").set_temporary()
-                );
-            }
-
-            let mut test = self.lock().unwrap();
-            let size = body.len() as u64;
-            test.length += size;
-
-            let input = (offset..offset + size).collect::<HashSet<_>>();
-            assert!(
-                test.bytes.is_disjoint(&input),
-                "input should not have overlap"
-            );
-            test.bytes.extend(input);
-
-            Ok(())
-        }
-
-        async fn abort_range(&self, _: &str) -> Result<()> {
-            Ok(())
-        }
-    }
-
-    #[tokio::test]
-    async fn test_range_writer_with_concurrent_errors() {
-        let mut rng = thread_rng();
-
-        let mut w = RangeWriter::new(TestWrite::new(), Some(Executor::new()), 
200);
-        let mut total_size = 0u64;
-
-        for _ in 0..1000 {
-            let size = rng.gen_range(1..1024);
-            total_size += size as u64;
-
-            let mut bs = vec![0; size];
-            rng.fill_bytes(&mut bs);
-
-            loop {
-                match w.write(bs.clone().into()).await {
-                    Ok(_) => break,
-                    Err(_) => continue,
-                }
-            }
-        }
-
-        loop {
-            match w.close().await {
-                Ok(_) => break,
-                Err(_) => continue,
-            }
-        }
-
-        let actual_bytes = w.w.lock().unwrap().bytes.clone();
-        let expected_bytes: HashSet<_> = (0..total_size).collect();
-        assert_eq!(actual_bytes, expected_bytes);
-
-        let actual_size = w.w.lock().unwrap().length;
-        assert_eq!(actual_size, total_size);
-    }
-}

Reply via email to