WenyXu commented on code in PR #3915:
URL: 
https://github.com/apache/incubator-opendal/pull/3915#discussion_r1442487672


##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -103,44 +109,119 @@ pub struct MultipartUploadPart {
     pub etag: String,
 }
 
+struct UploadFuture(BoxedFuture<Result<MultipartUploadPart>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this 
UploadFuture.
+unsafe impl Send for UploadFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for UploadFuture.
+unsafe impl Sync for UploadFuture {}
+
+impl Future for UploadFuture {
+    type Output = Result<MultipartUploadPart>;
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        self.get_mut().0.poll_unpin(cx)
+    }
+}
+
+#[derive(Clone)]
+struct WriteTask {
+    part_number: usize,
+    bs: oio::ChunkedBytes,
+}
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this 
WriteTask.
+unsafe impl Send for WriteTask {}
+/// # Safety
+///
+/// We will only take `&mut Self` reference for WriteTask.
+unsafe impl Sync for WriteTask {}
+
 /// MultipartUploadWriter will implements [`Write`] based on multipart
 /// uploads.
 pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
-    state: State<W>,
+    state: State,
+    w: Arc<W>,
 
-    cache: Option<oio::ChunkedBytes>,
     upload_id: Option<Arc<String>>,
     parts: Vec<MultipartUploadPart>,
+    processing_tasks: VecDeque<WriteTask>,
+    pending_tasks: VecDeque<WriteTask>,
+    futures: ConcurrentFutures<UploadFuture>,

Review Comment:
   I'm trying to push a task back to the `pending_tasks` if it fails.
   
   ```rust
   let task = self.processing_tasks.pop_front().unwrap();
           match part {
               Ok(part) => {
                   self.parts.push(part);
                   Ok(())
               }
               Err(err) => {
                   self.pending_tasks.push_front(task);
                   Err(err)
               }
           }
   ```
   
   How about letting the `UploadTask` return the origin `WriteTask`? Then, we 
can push it back to the `pending_tasks` queue.
   
   ```rust
   enum MultipartUploadPartResult {
       Done(...)
       Failed((WriteTask, Error))
   }
   
   impl Future for UploadFuture {
       type Output = MultipartUploadPartResult;
       fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
           ...
       }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to