fsdvh commented on issue #6460:
URL: https://github.com/apache/arrow-rs/issues/6460#issuecomment-2422339228

   I managed to reproduce the issue on our staging environment and find a 
problem that was causing it, even so I can't explain it fully.
   
   We have a code to upload data to the s3 store in chunks, we're using a 
combination of the `MultipartUpload` + `WriteMultipart`.
   
   Here is the 
[code](https://github.com/apache/arrow-rs/blob/dd5a2294b8b28f768b991e0e89fe7686b296c4ec/object_store/src/aws/mod.rs#L336)
 for `MultipartUpload` for S3:
   
   ```rust
   fn put_part(&mut self, data: PutPayload) -> UploadPart {
           let idx = self.part_idx;
           self.part_idx += 1;
           let state = Arc::clone(&self.state);
           Box::pin(async move {
               let part = state
                   .client
                   .put_part(&state.location, &state.upload_id, idx, data)
                   .await?;
               state.parts.put(idx, part);
               Ok(())
           })
       }
   ```
   
   In short, we synchronously increment the index for a writer using (&mut ref) 
which should protect us from races, and then create a future to write a part.
   
   We call this method 
[here](https://github.com/apache/arrow-rs/blob/dd5a2294b8b28f768b991e0e89fe7686b296c4ec/object_store/src/upload.rs#L211)
 in `WriteMultipart` when we have a chunk of the data to write.
   
   ```rust
     pub(crate) fn put_part(&mut self, part: PutPayload) {
           self.tasks.spawn(self.upload.put_part(part));
       }
   ```
   
   We're spawning a new task to upload part to S3 and also additionally 
assigning idx for this partition, everything looks correct and safe as we are 
protected by `&mut ref`, in my understanding synchronous part of the `put_part` 
will be the executed immediately, without waiting for future polling.
   
   During [investigation](https://github.com/coralogix/arrow-rs/pull/54) I 
added a lot of "logging" to our fork of object-store. And found out a very 
weird thing. From time to time (I would even use work very often) I observed 
such things:
   
   ```
   (last -1) -> 1729195754539115 before span; 1729195754655545 after span
   last        -> 1729195754539834 before span; 1729195754539847 after span
   ```
   
   In short, even so last write before calling `complete` method was scheduled 
in the correct order immediate execution of the spawned task for the uploading 
part was scheduled in a different order, which is okay in general as we giving 
it to the runtime scheduler. But weirdly enough same applies to the sync part 
of the `put_part` code, meaning that the last partition with a size less than 
needed for S3 was assigned the last - 1 index causing the whole upload to fail 
during completion.
   
   I don't know why it's happening like that, I will probably spend some more 
time trying to coin it out, but the solution for this problem is quite simple:
   
   ```rust
   pub(crate) fn put_part(&mut self, part: PutPayload) {
           let task = self.upload.put_part(part);
           self.tasks.spawn(task);
       }
   ```
   
   create a task future outside of the spawn call to guarantee that we assigned 
a proper index to the part and then submit a task
   
   Please let me know what you think @tustvold 🙇 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to