This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch poll-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 977440d1a1f1b3a1d2b3c07cdab27cc9c9606512
Author: Xuanwo <[email protected]>
AuthorDate: Fri Sep 8 15:43:06 2023 +0800

    cleanup gcs
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/gcs/writer.rs | 173 +++++++++++++++++++++-------------------
 1 file changed, 90 insertions(+), 83 deletions(-)

diff --git a/core/src/services/gcs/writer.rs b/core/src/services/gcs/writer.rs
index 2436165c4..c21f6b4e5 100644
--- a/core/src/services/gcs/writer.rs
+++ b/core/src/services/gcs/writer.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use std::sync::Arc;
+use std::task::{Context, Poll};
 
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -119,94 +120,100 @@ impl GcsWriter {
 #[async_trait]
 impl oio::Write for GcsWriter {
     fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> 
Poll<Result<usize>> {
-        let size = bs.remaining();
-
-        let location = match &self.location {
-            Some(location) => location,
-            None => {
-                if self.op.content_length().unwrap_or_default() == size as u64 
&& self.written == 0
-                {
-                    self.write_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
-                        .await?;
-
-                    return Ok(size);
-                } else {
-                    let location = self.initiate_upload().await?;
-                    self.location = Some(location);
-                    self.location.as_deref().unwrap()
-                }
-            }
-        };
-
-        self.buffer.push(bs.copy_to_bytes(size));
-        // Return directly if the buffer is not full
-        if self.buffer.len() <= self.write_fixed_size {
-            return Ok(size);
-        }
-
-        let bs = self.buffer.peak_exact(self.write_fixed_size);
-
-        match self.write_part(location, bs).await {
-            Ok(_) => {
-                self.buffer.take(self.write_fixed_size);
-                self.written += self.write_fixed_size as u64;
-                Ok(size)
-            }
-            Err(e) => {
-                // If the upload fails, we should pop the given bs to make sure
-                // write is re-enter safe.
-                self.buffer.pop();
-                Err(e)
-            }
-        }
+        // let size = bs.remaining();
+        //
+        // let location = match &self.location {
+        //     Some(location) => location,
+        //     None => {
+        //         if self.op.content_length().unwrap_or_default() == size as 
u64 && self.written == 0
+        //         {
+        //             self.write_oneshot(size as u64, 
AsyncBody::Bytes(bs.copy_to_bytes(size)))
+        //                 .await?;
+        //
+        //             return Ok(size);
+        //         } else {
+        //             let location = self.initiate_upload().await?;
+        //             self.location = Some(location);
+        //             self.location.as_deref().unwrap()
+        //         }
+        //     }
+        // };
+        //
+        // self.buffer.push(bs.copy_to_bytes(size));
+        // // Return directly if the buffer is not full
+        // if self.buffer.len() <= self.write_fixed_size {
+        //     return Ok(size);
+        // }
+        //
+        // let bs = self.buffer.peak_exact(self.write_fixed_size);
+        //
+        // match self.write_part(location, bs).await {
+        //     Ok(_) => {
+        //         self.buffer.take(self.write_fixed_size);
+        //         self.written += self.write_fixed_size as u64;
+        //         Ok(size)
+        //     }
+        //     Err(e) => {
+        //         // If the upload fails, we should pop the given bs to make 
sure
+        //         // write is re-enter safe.
+        //         self.buffer.pop();
+        //         Err(e)
+        //     }
+        // }
+
+        todo!()
     }
 
     fn poll_abort(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        let location = if let Some(location) = &self.location {
-            location
-        } else {
-            return Ok(());
-        };
-
-        let resp = self.core.gcs_abort_resumable_upload(location).await?;
-
-        match resp.status().as_u16() {
-            // gcs returns 499 if the upload aborted successfully
-            // reference: 
https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload-json
-            499 => {
-                resp.into_body().consume().await?;
-                self.location = None;
-                self.buffer.clear();
-                Ok(())
-            }
-            _ => Err(parse_error(resp).await?),
-        }
+        // let location = if let Some(location) = &self.location {
+        //     location
+        // } else {
+        //     return Ok(());
+        // };
+        //
+        // let resp = self.core.gcs_abort_resumable_upload(location).await?;
+        //
+        // match resp.status().as_u16() {
+        //     // gcs returns 499 if the upload aborted successfully
+        //     // reference: 
https://cloud.google.com/storage/docs/performing-resumable-uploads#cancel-upload-json
+        //     499 => {
+        //         resp.into_body().consume().await?;
+        //         self.location = None;
+        //         self.buffer.clear();
+        //         Ok(())
+        //     }
+        //     _ => Err(parse_error(resp).await?),
+        // }
+
+        todo!()
     }
 
     fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
-        let location = if let Some(location) = &self.location {
-            location
-        } else {
-            return Ok(());
-        };
-
-        let bs = self.buffer.peak_exact(self.buffer.len());
-
-        let resp = self
-            .core
-            .gcs_complete_resumable_upload(location, self.written, bs)
-            .await?;
-
-        let status = resp.status();
-        match status {
-            StatusCode::OK => {
-                resp.into_body().consume().await?;
-
-                self.location = None;
-                self.buffer.clear();
-                Ok(())
-            }
-            _ => Err(parse_error(resp).await?),
-        }
+        // let location = if let Some(location) = &self.location {
+        //     location
+        // } else {
+        //     return Ok(());
+        // };
+        //
+        // let bs = self.buffer.peak_exact(self.buffer.len());
+        //
+        // let resp = self
+        //     .core
+        //     .gcs_complete_resumable_upload(location, self.written, bs)
+        //     .await?;
+        //
+        // let status = resp.status();
+        // match status {
+        //     StatusCode::OK => {
+        //         resp.into_body().consume().await?;
+        //
+        //         self.location = None;
+        //         self.buffer.clear();
+        //         Ok(())
+        //     }
+        //     _ => Err(parse_error(resp).await?),
+        // }
+
+        todo!()
     }
 }

Reply via email to