Xuanwo commented on code in PR #3915:
URL: 
https://github.com/apache/incubator-opendal/pull/3915#discussion_r1442443541


##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -103,44 +109,119 @@ pub struct MultipartUploadPart {
     pub etag: String,
 }
 
+struct UploadFuture(BoxedFuture<Result<MultipartUploadPart>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this 
UploadFuture.
+unsafe impl Send for UploadFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for UploadFuture.
+unsafe impl Sync for UploadFuture {}
+
+impl Future for UploadFuture {
+    type Output = Result<MultipartUploadPart>;
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        self.get_mut().0.poll_unpin(cx)
+    }
+}
+
+#[derive(Clone)]
+struct WriteTask {
+    part_number: usize,
+    bs: oio::ChunkedBytes,
+}
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this 
WriteTask.
+unsafe impl Send for WriteTask {}
+/// # Safety
+///
+/// We will only take `&mut Self` reference for WriteTask.
+unsafe impl Sync for WriteTask {}
+
 /// MultipartUploadWriter will implements [`Write`] based on multipart
 /// uploads.
 pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
-    state: State<W>,
+    state: State,
+    w: Arc<W>,
 
-    cache: Option<oio::ChunkedBytes>,
     upload_id: Option<Arc<String>>,
     parts: Vec<MultipartUploadPart>,
+    processing_tasks: VecDeque<WriteTask>,
+    pending_tasks: VecDeque<WriteTask>,
+    futures: ConcurrentFutures<UploadFuture>,
+    part_number: usize,

Review Comment:
   How about using `next_part_number`?



##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -103,44 +109,119 @@ pub struct MultipartUploadPart {
     pub etag: String,
 }
 
+struct UploadFuture(BoxedFuture<Result<MultipartUploadPart>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this 
UploadFuture.
+unsafe impl Send for UploadFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for UploadFuture.
+unsafe impl Sync for UploadFuture {}
+
+impl Future for UploadFuture {
+    type Output = Result<MultipartUploadPart>;
+    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+        self.get_mut().0.poll_unpin(cx)
+    }
+}
+
+#[derive(Clone)]
+struct WriteTask {
+    part_number: usize,
+    bs: oio::ChunkedBytes,
+}
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this 
WriteTask.
+unsafe impl Send for WriteTask {}
+/// # Safety
+///
+/// We will only take `&mut Self` reference for WriteTask.
+unsafe impl Sync for WriteTask {}
+
 /// MultipartUploadWriter will implements [`Write`] based on multipart
 /// uploads.
 pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
-    state: State<W>,
+    state: State,
+    w: Arc<W>,
 
-    cache: Option<oio::ChunkedBytes>,
     upload_id: Option<Arc<String>>,
     parts: Vec<MultipartUploadPart>,
+    processing_tasks: VecDeque<WriteTask>,
+    pending_tasks: VecDeque<WriteTask>,
+    futures: ConcurrentFutures<UploadFuture>,

Review Comment:
   I'm a bit confused why we need `processing_tasks` and `pending_tasks`. 
`ConcurrentFutures` itself maintains the order of futures, we don't need extra 
task queues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to