This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 0227659d46 refactor(core): Remove unused `size` for `RangeWrite`.
(#4755)
0227659d46 is described below
commit 0227659d46eb501fe3b3a68ee33fce275ba7e23d
Author: Weijie Guo <[email protected]>
AuthorDate: Tue Jun 18 15:01:11 2024 +0800
refactor(core): Remove unused `size` for `RangeWrite`. (#4755)
---
core/src/raw/oio/write/range_write.rs | 34 ++++++++++++++--------------------
core/src/services/gcs/writer.rs | 21 ++++++---------------
2 files changed, 20 insertions(+), 35 deletions(-)
diff --git a/core/src/raw/oio/write/range_write.rs
b/core/src/raw/oio/write/range_write.rs
index 95b35aae52..67ae619dd9 100644
--- a/core/src/raw/oio/write/range_write.rs
+++ b/core/src/raw/oio/write/range_write.rs
@@ -58,7 +58,7 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
/// RangeWriter will call this API when:
///
/// - All the data has been written to the buffer and we can perform the
upload at once.
- fn write_once(&self, size: u64, body: Buffer) -> impl Future<Output =
Result<()>> + MaybeSend;
+ fn write_once(&self, body: Buffer) -> impl Future<Output = Result<()>> +
MaybeSend;
/// Initiate range the range write, the returning value is the location.
fn initiate_range(&self) -> impl Future<Output = Result<String>> +
MaybeSend;
@@ -68,7 +68,6 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
&self,
location: &str,
offset: u64,
- size: u64,
body: Buffer,
) -> impl Future<Output = Result<()>> + MaybeSend;
@@ -77,7 +76,6 @@ pub trait RangeWrite: Send + Sync + Unpin + 'static {
&self,
location: &str,
offset: u64,
- size: u64,
body: Buffer,
) -> impl Future<Output = Result<()>> + MaybeSend;
@@ -119,12 +117,10 @@ impl<W: RangeWrite> RangeWriter<W> {
tasks: ConcurrentTasks::new(executor, concurrent, |input| {
Box::pin(async move {
- let fut = input.w.write_range(
- &input.location,
- input.offset,
- input.bytes.len() as u64,
- input.bytes.clone(),
- );
+ let fut =
+ input
+ .w
+ .write_range(&input.location, input.offset,
input.bytes.clone());
match input.executor.timeout() {
None => {
let result = fut.await;
@@ -197,12 +193,9 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
async fn close(&mut self) -> Result<()> {
let Some(location) = self.location.clone() else {
- let (size, body) = match self.cache.clone() {
- Some(cache) => (cache.len(), cache),
- None => (0, Buffer::new()),
- };
+ let body = self.cache.clone().unwrap_or_default();
// Call write_once if there is no data in buffer and no location.
- self.w.write_once(size as u64, body).await?;
+ self.w.write_once(body).await?;
self.cache = None;
return Ok(());
};
@@ -212,9 +205,7 @@ impl<W: RangeWrite> oio::Write for RangeWriter<W> {
if let Some(buffer) = self.cache.clone() {
let offset = self.next_offset;
- self.w
- .complete_range(&location, offset, buffer.len() as u64, buffer)
- .await?;
+ self.w.complete_range(&location, offset, buffer).await?;
self.cache = None;
}
@@ -265,8 +256,9 @@ mod tests {
}
impl RangeWrite for Arc<Mutex<TestWrite>> {
- async fn write_once(&self, size: u64, _: Buffer) -> Result<()> {
+ async fn write_once(&self, body: Buffer) -> Result<()> {
let mut test = self.lock().unwrap();
+ let size = body.len() as u64;
test.length += size;
test.bytes.extend(0..size);
@@ -277,7 +269,7 @@ mod tests {
Ok("test".to_string())
}
- async fn write_range(&self, _: &str, offset: u64, size: u64, _:
Buffer) -> Result<()> {
+ async fn write_range(&self, _: &str, offset: u64, body: Buffer) ->
Result<()> {
// Add an async sleep here to enforce some pending.
sleep(Duration::from_millis(50)).await;
@@ -289,6 +281,7 @@ mod tests {
}
let mut test = self.lock().unwrap();
+ let size = body.len() as u64;
test.length += size;
let input = (offset..offset + size).collect::<HashSet<_>>();
@@ -302,7 +295,7 @@ mod tests {
Ok(())
}
- async fn complete_range(&self, _: &str, offset: u64, size: u64, _:
Buffer) -> Result<()> {
+ async fn complete_range(&self, _: &str, offset: u64, body: Buffer) ->
Result<()> {
// Add an async sleep here to enforce some pending.
sleep(Duration::from_millis(50)).await;
@@ -314,6 +307,7 @@ mod tests {
}
let mut test = self.lock().unwrap();
+ let size = body.len() as u64;
test.length += size;
let input = (offset..offset + size).collect::<HashSet<_>>();
diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 9307e331da..bd2827b764 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -43,7 +43,8 @@ impl GcsWriter {
}
impl oio::RangeWrite for GcsWriter {
- async fn write_once(&self, size: u64, body: Buffer) -> Result<()> {
+ async fn write_once(&self, body: Buffer) -> Result<()> {
+ let size = body.len() as u64;
let mut req = self.core.gcs_insert_object_request(
&percent_encode_path(&self.path),
Some(size),
@@ -83,13 +84,8 @@ impl oio::RangeWrite for GcsWriter {
}
}
- async fn write_range(
- &self,
- location: &str,
- written: u64,
- size: u64,
- body: Buffer,
- ) -> Result<()> {
+ async fn write_range(&self, location: &str, written: u64, body: Buffer) ->
Result<()> {
+ let size = body.len() as u64;
let mut req = self
.core
.gcs_upload_in_resumable_upload(location, size, written, body)?;
@@ -105,13 +101,8 @@ impl oio::RangeWrite for GcsWriter {
}
}
- async fn complete_range(
- &self,
- location: &str,
- written: u64,
- size: u64,
- body: Buffer,
- ) -> Result<()> {
+ async fn complete_range(&self, location: &str, written: u64, body: Buffer)
-> Result<()> {
+ let size = body.len() as u64;
let resp = self
.core
.gcs_complete_resumable_upload(location, written, size, body)