This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch poll-write in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
commit 977440d1a1f1b3a1d2b3c07cdab27cc9c9606512 Author: Xuanwo <[email protected]> AuthorDate: Fri Sep 8 15:43:06 2023 +0800 cleanup gcs Signed-off-by: Xuanwo <[email protected]> --- core/src/services/gcs/writer.rs | 173 +++++++++++++++++++++------------------- 1 file changed, 90 insertions(+), 83 deletions(-) diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs index 2436165c4..c21f6b4e5 100644 --- a/core/src/services/gcs/writer.rs +++ b/core/src/services/gcs/writer.rs @@ -16,6 +16,7 @@ // under the License. use std::sync::Arc; +use std::task::{Context, Poll}; use async_trait::async_trait; use bytes::Bytes; @@ -119,94 +120,100 @@ impl GcsWriter { #[async_trait] impl oio::Write for GcsWriter { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll<Result<usize>> { - let size = bs.remaining(); - - let location = match &self.location { - Some(location) => location, - None => { - if self.op.content_length().unwrap_or_default() == size as u64 && self.written == 0 - { - self.write_oneshot(size as u64, AsyncBody::Bytes(bs.copy_to_bytes(size))) - .await?; - - return Ok(size); - } else { - let location = self.initiate_upload().await?; - self.location = Some(location); - self.location.as_deref().unwrap() - } - } - }; - - self.buffer.push(bs.copy_to_bytes(size)); - // Return directly if the buffer is not full - if self.buffer.len() <= self.write_fixed_size { - return Ok(size); - } - - let bs = self.buffer.peak_exact(self.write_fixed_size); - - match self.write_part(location, bs).await { - Ok(_) => { - self.buffer.take(self.write_fixed_size); - self.written += self.write_fixed_size as u64; - Ok(size) - } - Err(e) => { - // If the upload fails, we should pop the given bs to make sure - // write is re-enter safe. - self.buffer.pop(); - Err(e) - } - } + // let size = bs.remaining(); + // + // let location = match &self.location { + // Some(location) => location, + // None => { + // if self.op.content_length().unwrap_or_default() == size as u64 && self.written == 0 + // { + // self.write_oneshot(size as u64, AsyncBody::Bytes(bs.copy_to_bytes(size))) + // .await?; + // + // return Ok(size); + // } else { + // let location = self.initiate_upload().await?; + // self.location = Some(location); + // self.location.as_deref().unwrap() + // } + // } + // }; + // + // self.buffer.push(bs.copy_to_bytes(size)); + // // Return directly if the buffer is not full + // if self.buffer.len() <= self.write_fixed_size { + // return Ok(size); + // } + // + // let bs = self.buffer.peak_exact(self.write_fixed_size); + // + // match self.write_part(location, bs).await { + // Ok(_) => { + // self.buffer.take(self.write_fixed_size); + // self.written += self.write_fixed_size as u64; + // Ok(size) + // } + // Err(e) => { + // // If the upload fails, we should pop the given bs to make sure + // // write is re-enter safe. + // self.buffer.pop(); + // Err(e) + // } + // } + + todo!() } fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - let location = if let Some(location) = &self.location { - location - } else { - return Ok(()); - }; - - let resp = self.core.gcs_abort_resumable_upload(location).await?; - - match resp.status().as_u16() { - // gcs returns 499 if the upload aborted successfully - // reference: https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload-json - 499 => { - resp.into_body().consume().await?; - self.location = None; - self.buffer.clear(); - Ok(()) - } - _ => Err(parse_error(resp).await?), - } + // let location = if let Some(location) = &self.location { + // location + // } else { + // return Ok(()); + // }; + // + // let resp = self.core.gcs_abort_resumable_upload(location).await?; + // + // match resp.status().as_u16() { + // // gcs returns 499 if the upload aborted successfully + // // reference: https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload-json + // 499 => { + // resp.into_body().consume().await?; + // self.location = None; + // self.buffer.clear(); + // Ok(()) + // } + // _ => Err(parse_error(resp).await?), + // } + + todo!() } fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> { - let location = if let Some(location) = &self.location { - location - } else { - return Ok(()); - }; - - let bs = self.buffer.peak_exact(self.buffer.len()); - - let resp = self - .core - .gcs_complete_resumable_upload(location, self.written, bs) - .await?; - - let status = resp.status(); - match status { - StatusCode::OK => { - resp.into_body().consume().await?; - - self.location = None; - self.buffer.clear(); - Ok(()) - } - _ => Err(parse_error(resp).await?), - } + // let location = if let Some(location) = &self.location { + // location + // } else { + // return Ok(()); + // }; + // + // let bs = self.buffer.peak_exact(self.buffer.len()); + // + // let resp = self + // .core + // .gcs_complete_resumable_upload(location, self.written, bs) + // .await?; + // + // let status = resp.status(); + // match status { + // StatusCode::OK => { + // resp.into_body().consume().await?; + // + // self.location = None; + // self.buffer.clear(); + // Ok(()) + // } + // _ => Err(parse_error(resp).await?), + // } + + todo!() } }
