fsdvh opened a new issue, #276: URL: https://github.com/apache/arrow-rs-object-store/issues/276
**Describe the bug** So recently we started seeing two issues: - multiple shutdowns of the same writer will cause an issue, related to [this](https://github.com/apache/arrow-rs/blob/50e9e4927ed4d763c76227c588aa324aa173032e/object_store/src/buffered.rs#L220 - partition size issue, related to [this](https://github.com/apache/arrow-rs/blob/50e9e4927ed4d763c76227c588aa324aa173032e/object_store/src/upload.rs#L120) # Multiple Shutdown First of all, I want to note that multiple shutdown calls to the same writer are the issue by itself, but I think we can make the situation better with minimum effort. Here is the code: ```rust BufWriterState::Write(x) => { let upload = x.take().ok_or_else(|| { std::io::Error::new( ErrorKind::InvalidInput, "Cannot shutdown a writer that has already been shut down", ) })?; self.state = BufWriterState::Flush( async move { upload.finish().await?; Ok(()) } .boxed(), ) } ``` I think we can change it to something more friendly like this: ```rust BufWriterState::Write(x) => { if let Some(upload) = x.take() { self.state = BufWriterState::Flush( async move { upload.finish().await.map(|_| ()) }.boxed(), ) } else { return Poll::Ready(Ok(())); } } ``` This way on a second shutdown call we just immediately return `Ok(())` # Upload part size issue Something leftover during the shutdown, complete before the previous upload, in this case, we're getting: ``` Your proposed upload is smaller than the minimum allowed size ``` To mitigate this issue we probably should wait for all previous part uploads to complete and then upload the final part which may be smaller than the minimum size of the last one. Here is the original code I propose to change: ```rust pub async fn finish(mut self) -> Result<PutResult> { if !self.buffer.is_empty() { let part = std::mem::take(&mut self.buffer); self.put_part(part.into()) } self.wait_for_capacity(0).await?; match self.upload.complete().await { Err(e) => { self.tasks.shutdown().await; self.upload.abort().await?; Err(e) } Ok(result) => Ok(result), } } ``` by injecting `self.wait_for_capacity(0).await?;` before actually putting the last chunk we can mitigate this issue. ```rust pub async fn finish(mut self) -> Result<PutResult> { if !self.buffer.is_empty() { self.wait_for_capacity(0).await?; // here let part = std::mem::take(&mut self.buffer); self.put_part(part.into()) } self.wait_for_capacity(0).await?; match self.upload.complete().await { Err(e) => { self.tasks.shutdown().await; self.upload.abort().await?; Err(e) } Ok(result) => Ok(result), } } ``` This way we wait for all ongoing uploads before submitting the last part -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org