Xuanwo commented on code in PR #3915:
URL:
https://github.com/apache/incubator-opendal/pull/3915#discussion_r1442715981
##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -103,44 +107,116 @@ pub struct MultipartUploadPart {
pub etag: String,
}
+struct UploadFuture(BoxedFuture<Result<MultipartUploadPart>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
UploadFuture.
+unsafe impl Send for UploadFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for UploadFuture.
+unsafe impl Sync for UploadFuture {}
+
+impl Future for UploadFuture {
+ type Output = Result<MultipartUploadPart>;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.get_mut().0.poll_unpin(cx)
+ }
+}
+
+#[derive(Clone)]
+struct WriteTask {
+ part_number: usize,
+ bs: oio::ChunkedBytes,
+}
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
WriteTask.
+unsafe impl Send for WriteTask {}
Review Comment:
We don't need those. `WriteTask` is `Send + Sync`
##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -103,44 +107,116 @@ pub struct MultipartUploadPart {
pub etag: String,
}
+struct UploadFuture(BoxedFuture<Result<MultipartUploadPart>>);
Review Comment:
How about `type UploadFuture = BoxedFuture<Result<MultipartUploadPart>>`?
Seems we don't need a new struct?
And how about using `WritePartFuture` as it's naming?
##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -103,44 +107,116 @@ pub struct MultipartUploadPart {
pub etag: String,
}
+struct UploadFuture(BoxedFuture<Result<MultipartUploadPart>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
UploadFuture.
+unsafe impl Send for UploadFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for UploadFuture.
+unsafe impl Sync for UploadFuture {}
+
+impl Future for UploadFuture {
+ type Output = Result<MultipartUploadPart>;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.get_mut().0.poll_unpin(cx)
+ }
+}
+
+#[derive(Clone)]
Review Comment:
Why we need `Clone`?
##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -103,44 +107,116 @@ pub struct MultipartUploadPart {
pub etag: String,
}
+struct UploadFuture(BoxedFuture<Result<MultipartUploadPart>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
UploadFuture.
+unsafe impl Send for UploadFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for UploadFuture.
+unsafe impl Sync for UploadFuture {}
+
+impl Future for UploadFuture {
+ type Output = Result<MultipartUploadPart>;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.get_mut().0.poll_unpin(cx)
+ }
+}
+
+#[derive(Clone)]
+struct WriteTask {
+ part_number: usize,
+ bs: oio::ChunkedBytes,
+}
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
WriteTask.
+unsafe impl Send for WriteTask {}
+/// # Safety
+///
+/// We will only take `&mut Self` reference for WriteTask.
+unsafe impl Sync for WriteTask {}
+
/// MultipartUploadWriter will implements [`Write`] based on multipart
/// uploads.
pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
- state: State<W>,
+ state: State,
+ w: Arc<W>,
- cache: Option<oio::ChunkedBytes>,
upload_id: Option<Arc<String>>,
parts: Vec<MultipartUploadPart>,
+ pending: Option<WriteTask>,
+ futures: ConcurrentFutures<UploadFuture>,
+ next_part_number: usize,
}
-enum State<W> {
- Idle(Option<W>),
- Init(BoxedFuture<(W, Result<String>)>),
- Write(BoxedFuture<(W, Result<MultipartUploadPart>)>),
- Close(BoxedFuture<(W, Result<()>)>),
- Abort(BoxedFuture<(W, Result<()>)>),
+enum State {
+ Idle,
+ Init(BoxedFuture<Result<String>>),
+ Busy,
+ Close(BoxedFuture<Result<()>>),
+ Abort(BoxedFuture<Result<()>>),
}
/// # Safety
///
/// wasm32 is a special target that we only have one event-loop for this state.
-unsafe impl<S: MultipartUploadWrite> Send for State<S> {}
+unsafe impl Send for State {}
/// # Safety
///
/// We will only take `&mut Self` reference for State.
-unsafe impl<S: MultipartUploadWrite> Sync for State<S> {}
+unsafe impl Sync for State {}
impl<W: MultipartUploadWrite> MultipartUploadWriter<W> {
/// Create a new MultipartUploadWriter.
- pub fn new(inner: W) -> Self {
+ pub fn new(inner: W, concurrent: usize) -> Self {
Self {
- state: State::Idle(Some(inner)),
+ state: State::Idle,
- cache: None,
+ w: Arc::new(inner),
upload_id: None,
parts: Vec::new(),
+ pending: None,
+ futures: ConcurrentFutures::new(1.max(concurrent)),
+ next_part_number: 0,
}
}
+
+ /// Increases part number and return the previous part number.
+ fn inc_part_number(&mut self) -> usize {
+ let part_number = self.next_part_number;
+ self.next_part_number += 1;
+ part_number
+ }
+
+ fn add_write_task(&mut self, bs: &dyn oio::WriteBuf) -> usize {
Review Comment:
I feel like we can still use the `cache` concept. And the function could be
called as `fill_cache` or something.
##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -103,44 +107,116 @@ pub struct MultipartUploadPart {
pub etag: String,
}
+struct UploadFuture(BoxedFuture<Result<MultipartUploadPart>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
UploadFuture.
+unsafe impl Send for UploadFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for UploadFuture.
+unsafe impl Sync for UploadFuture {}
+
+impl Future for UploadFuture {
+ type Output = Result<MultipartUploadPart>;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.get_mut().0.poll_unpin(cx)
+ }
+}
+
+#[derive(Clone)]
+struct WriteTask {
+ part_number: usize,
+ bs: oio::ChunkedBytes,
+}
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
WriteTask.
+unsafe impl Send for WriteTask {}
+/// # Safety
+///
+/// We will only take `&mut Self` reference for WriteTask.
+unsafe impl Sync for WriteTask {}
+
/// MultipartUploadWriter will implements [`Write`] based on multipart
/// uploads.
pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
- state: State<W>,
+ state: State,
+ w: Arc<W>,
- cache: Option<oio::ChunkedBytes>,
upload_id: Option<Arc<String>>,
parts: Vec<MultipartUploadPart>,
+ pending: Option<WriteTask>,
Review Comment:
Is `WriteTask` truly necessary? It appears that using `part_number` and
`oio::ChunkedBytes` would suffice.
##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -103,44 +107,116 @@ pub struct MultipartUploadPart {
pub etag: String,
}
+struct UploadFuture(BoxedFuture<Result<MultipartUploadPart>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
UploadFuture.
+unsafe impl Send for UploadFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for UploadFuture.
+unsafe impl Sync for UploadFuture {}
+
+impl Future for UploadFuture {
+ type Output = Result<MultipartUploadPart>;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.get_mut().0.poll_unpin(cx)
+ }
+}
+
+#[derive(Clone)]
+struct WriteTask {
+ part_number: usize,
+ bs: oio::ChunkedBytes,
+}
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
WriteTask.
+unsafe impl Send for WriteTask {}
+/// # Safety
+///
+/// We will only take `&mut Self` reference for WriteTask.
+unsafe impl Sync for WriteTask {}
+
/// MultipartUploadWriter will implements [`Write`] based on multipart
/// uploads.
pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
- state: State<W>,
+ state: State,
+ w: Arc<W>,
- cache: Option<oio::ChunkedBytes>,
upload_id: Option<Arc<String>>,
parts: Vec<MultipartUploadPart>,
+ pending: Option<WriteTask>,
+ futures: ConcurrentFutures<UploadFuture>,
+ next_part_number: usize,
}
-enum State<W> {
- Idle(Option<W>),
- Init(BoxedFuture<(W, Result<String>)>),
- Write(BoxedFuture<(W, Result<MultipartUploadPart>)>),
- Close(BoxedFuture<(W, Result<()>)>),
- Abort(BoxedFuture<(W, Result<()>)>),
+enum State {
+ Idle,
+ Init(BoxedFuture<Result<String>>),
+ Busy,
+ Close(BoxedFuture<Result<()>>),
+ Abort(BoxedFuture<Result<()>>),
}
/// # Safety
///
/// wasm32 is a special target that we only have one event-loop for this state.
-unsafe impl<S: MultipartUploadWrite> Send for State<S> {}
+unsafe impl Send for State {}
/// # Safety
///
/// We will only take `&mut Self` reference for State.
-unsafe impl<S: MultipartUploadWrite> Sync for State<S> {}
+unsafe impl Sync for State {}
impl<W: MultipartUploadWrite> MultipartUploadWriter<W> {
/// Create a new MultipartUploadWriter.
- pub fn new(inner: W) -> Self {
+ pub fn new(inner: W, concurrent: usize) -> Self {
Self {
- state: State::Idle(Some(inner)),
+ state: State::Idle,
- cache: None,
+ w: Arc::new(inner),
upload_id: None,
parts: Vec::new(),
+ pending: None,
+ futures: ConcurrentFutures::new(1.max(concurrent)),
+ next_part_number: 0,
}
}
+
+ /// Increases part number and return the previous part number.
+ fn inc_part_number(&mut self) -> usize {
+ let part_number = self.next_part_number;
+ self.next_part_number += 1;
+ part_number
+ }
+
+ fn add_write_task(&mut self, bs: &dyn oio::WriteBuf) -> usize {
+ let size = bs.remaining();
+ let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+ let part_number = self.inc_part_number();
+ self.pending = Some(WriteTask { bs, part_number });
+ size
+ }
+
+ fn process_write_task(&mut self, upload_id: Arc<String>, task: WriteTask) {
Review Comment:
The logic here seems simple, how about expanding it directly?
##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -103,44 +107,116 @@ pub struct MultipartUploadPart {
pub etag: String,
}
+struct UploadFuture(BoxedFuture<Result<MultipartUploadPart>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
UploadFuture.
+unsafe impl Send for UploadFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for UploadFuture.
+unsafe impl Sync for UploadFuture {}
+
+impl Future for UploadFuture {
+ type Output = Result<MultipartUploadPart>;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.get_mut().0.poll_unpin(cx)
+ }
+}
+
+#[derive(Clone)]
+struct WriteTask {
+ part_number: usize,
+ bs: oio::ChunkedBytes,
+}
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
WriteTask.
+unsafe impl Send for WriteTask {}
+/// # Safety
+///
+/// We will only take `&mut Self` reference for WriteTask.
+unsafe impl Sync for WriteTask {}
+
/// MultipartUploadWriter will implements [`Write`] based on multipart
/// uploads.
pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
- state: State<W>,
+ state: State,
+ w: Arc<W>,
- cache: Option<oio::ChunkedBytes>,
upload_id: Option<Arc<String>>,
parts: Vec<MultipartUploadPart>,
+ pending: Option<WriteTask>,
+ futures: ConcurrentFutures<UploadFuture>,
+ next_part_number: usize,
}
-enum State<W> {
- Idle(Option<W>),
- Init(BoxedFuture<(W, Result<String>)>),
- Write(BoxedFuture<(W, Result<MultipartUploadPart>)>),
- Close(BoxedFuture<(W, Result<()>)>),
- Abort(BoxedFuture<(W, Result<()>)>),
+enum State {
+ Idle,
+ Init(BoxedFuture<Result<String>>),
+ Busy,
+ Close(BoxedFuture<Result<()>>),
+ Abort(BoxedFuture<Result<()>>),
}
/// # Safety
///
/// wasm32 is a special target that we only have one event-loop for this state.
-unsafe impl<S: MultipartUploadWrite> Send for State<S> {}
+unsafe impl Send for State {}
/// # Safety
///
/// We will only take `&mut Self` reference for State.
-unsafe impl<S: MultipartUploadWrite> Sync for State<S> {}
+unsafe impl Sync for State {}
impl<W: MultipartUploadWrite> MultipartUploadWriter<W> {
/// Create a new MultipartUploadWriter.
- pub fn new(inner: W) -> Self {
+ pub fn new(inner: W, concurrent: usize) -> Self {
Self {
- state: State::Idle(Some(inner)),
+ state: State::Idle,
- cache: None,
+ w: Arc::new(inner),
upload_id: None,
parts: Vec::new(),
+ pending: None,
+ futures: ConcurrentFutures::new(1.max(concurrent)),
+ next_part_number: 0,
}
}
+
+ /// Increases part number and return the previous part number.
+ fn inc_part_number(&mut self) -> usize {
+ let part_number = self.next_part_number;
+ self.next_part_number += 1;
+ part_number
+ }
+
+ fn add_write_task(&mut self, bs: &dyn oio::WriteBuf) -> usize {
+ let size = bs.remaining();
+ let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+ let part_number = self.inc_part_number();
+ self.pending = Some(WriteTask { bs, part_number });
Review Comment:
Add `assert` here to make sure `cache` is `None`.
##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -103,44 +107,116 @@ pub struct MultipartUploadPart {
pub etag: String,
}
+struct UploadFuture(BoxedFuture<Result<MultipartUploadPart>>);
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
UploadFuture.
+unsafe impl Send for UploadFuture {}
+
+/// # Safety
+///
+/// We will only take `&mut Self` reference for UploadFuture.
+unsafe impl Sync for UploadFuture {}
+
+impl Future for UploadFuture {
+ type Output = Result<MultipartUploadPart>;
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ self.get_mut().0.poll_unpin(cx)
+ }
+}
+
+#[derive(Clone)]
+struct WriteTask {
+ part_number: usize,
+ bs: oio::ChunkedBytes,
+}
+
+/// # Safety
+///
+/// wasm32 is a special target that we only have one event-loop for this
WriteTask.
+unsafe impl Send for WriteTask {}
+/// # Safety
+///
+/// We will only take `&mut Self` reference for WriteTask.
+unsafe impl Sync for WriteTask {}
+
/// MultipartUploadWriter will implements [`Write`] based on multipart
/// uploads.
pub struct MultipartUploadWriter<W: MultipartUploadWrite> {
- state: State<W>,
+ state: State,
+ w: Arc<W>,
- cache: Option<oio::ChunkedBytes>,
upload_id: Option<Arc<String>>,
parts: Vec<MultipartUploadPart>,
+ pending: Option<WriteTask>,
+ futures: ConcurrentFutures<UploadFuture>,
+ next_part_number: usize,
}
-enum State<W> {
- Idle(Option<W>),
- Init(BoxedFuture<(W, Result<String>)>),
- Write(BoxedFuture<(W, Result<MultipartUploadPart>)>),
- Close(BoxedFuture<(W, Result<()>)>),
- Abort(BoxedFuture<(W, Result<()>)>),
+enum State {
+ Idle,
+ Init(BoxedFuture<Result<String>>),
+ Busy,
+ Close(BoxedFuture<Result<()>>),
+ Abort(BoxedFuture<Result<()>>),
}
/// # Safety
///
/// wasm32 is a special target that we only have one event-loop for this state.
-unsafe impl<S: MultipartUploadWrite> Send for State<S> {}
+unsafe impl Send for State {}
/// # Safety
///
/// We will only take `&mut Self` reference for State.
-unsafe impl<S: MultipartUploadWrite> Sync for State<S> {}
+unsafe impl Sync for State {}
impl<W: MultipartUploadWrite> MultipartUploadWriter<W> {
/// Create a new MultipartUploadWriter.
- pub fn new(inner: W) -> Self {
+ pub fn new(inner: W, concurrent: usize) -> Self {
Self {
- state: State::Idle(Some(inner)),
+ state: State::Idle,
- cache: None,
+ w: Arc::new(inner),
upload_id: None,
parts: Vec::new(),
+ pending: None,
+ futures: ConcurrentFutures::new(1.max(concurrent)),
+ next_part_number: 0,
}
}
+
+ /// Increases part number and return the previous part number.
+ fn inc_part_number(&mut self) -> usize {
+ let part_number = self.next_part_number;
+ self.next_part_number += 1;
+ part_number
+ }
+
+ fn add_write_task(&mut self, bs: &dyn oio::WriteBuf) -> usize {
+ let size = bs.remaining();
+ let bs = oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
+ let part_number = self.inc_part_number();
Review Comment:
Seems `inc_part_number` only be used once, how about expand it directly?
##########
core/src/raw/oio/write/multipart_upload_write.rs:
##########
@@ -150,60 +226,42 @@ where
fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) ->
Poll<Result<usize>> {
loop {
match &mut self.state {
- State::Idle(w) => {
+ State::Idle => {
match self.upload_id.as_ref() {
Some(upload_id) => {
let upload_id = upload_id.clone();
- let part_number = self.parts.len();
-
- let bs = self.cache.clone().expect("cache must be
valid").clone();
- let w = w.take().expect("writer must be valid");
- self.state = State::Write(Box::pin(async move {
- let size = bs.len();
- let part = w
- .write_part(
- &upload_id,
- part_number,
- size as u64,
- AsyncBody::ChunkedBytes(bs),
- )
- .await;
-
- (w, part)
- }));
+ if self.futures.has_remaining() {
+ let task = self.pending.take().expect("pending
write must exist");
+ self.process_write_task(upload_id, task);
+ let size = self.add_write_task(bs);
+ return Poll::Ready(Ok(size));
+ } else {
+ self.state = State::Busy;
+ }
}
None => {
// Fill cache with the first write.
- if self.cache.is_none() {
- let size = bs.remaining();
- let cb =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
- self.cache = Some(cb);
+ if self.pending.is_none() {
+ let size = self.add_write_task(bs);
return Poll::Ready(Ok(size));
}
- let w = w.take().expect("writer must be valid");
- self.state = State::Init(Box::pin(async move {
- let upload_id = w.initiate_part().await;
- (w, upload_id)
- }));
+ let w = self.w.clone();
+ self.state =
+ State::Init(Box::pin(async move {
w.initiate_part().await }));
}
}
}
State::Init(fut) => {
- let (w, upload_id) = ready!(fut.as_mut().poll(cx));
- self.state = State::Idle(Some(w));
+ let upload_id = ready!(fut.as_mut().poll(cx));
+ self.state = State::Idle;
self.upload_id = Some(Arc::new(upload_id?));
}
- State::Write(fut) => {
- let (w, part) = ready!(fut.as_mut().poll(cx));
- self.state = State::Idle(Some(w));
- self.parts.push(part?);
-
- // Replace the cache when last write succeeded
- let size = bs.remaining();
- let cb =
oio::ChunkedBytes::from_vec(bs.vectored_bytes(size));
- self.cache = Some(cb);
- return Poll::Ready(Ok(size));
+ State::Busy => {
Review Comment:
Can we merge `State::Busy` with `State::Idle`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]