tustvold commented on code in PR #4153:
URL: https://github.com/apache/arrow-rs/pull/4153#discussion_r1180246497
##########
object_store/src/memory.rs:
##########
@@ -329,8 +340,59 @@ impl AsyncWrite for InMemoryUpload {
}
}
+struct InMemoryAppend {
+ location: Path,
+ data: Vec<u8>,
+ storage: StorageType,
+}
+
+impl AsyncWrite for InMemoryAppend {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, io::Error>> {
+ self.data.extend_from_slice(buf);
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), io::Error>> {
+ let storage = StorageType::clone(&self.storage);
+
+ let mut writer = storage.write();
+
+ if let Some((bytes, _)) = writer.remove(&self.location) {
Review Comment:
It might be more idiomatic to use
[`entry`](https://doc.rust-lang.org/std/collections/struct.BTreeMap.html#method.entry)
##########
object_store/src/memory.rs:
##########
@@ -329,8 +340,59 @@ impl AsyncWrite for InMemoryUpload {
}
}
+struct InMemoryAppend {
+ location: Path,
+ data: Vec<u8>,
+ storage: StorageType,
+}
+
+impl AsyncWrite for InMemoryAppend {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<Result<usize, io::Error>> {
+ self.data.extend_from_slice(buf);
+ Poll::Ready(Ok(buf.len()))
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), io::Error>> {
+ let storage = StorageType::clone(&self.storage);
+
+ let mut writer = storage.write();
+
+ if let Some((bytes, _)) = writer.remove(&self.location) {
+ let buf = std::mem::take(&mut self.data);
+ let concat =
Bytes::from_iter(bytes.into_iter().chain(buf.into_iter()));
+ writer.insert(self.location.clone(), (concat, Utc::now()));
+ } else {
+ writer.insert(
+ self.location.clone(),
+ (Bytes::from(std::mem::take(&mut self.data)), Utc::now()),
+ );
+ };
+ Poll::Ready(Ok(()))
+ }
+
+ fn poll_shutdown(
+ self: Pin<&mut Self>,
+ _cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<Result<(), io::Error>> {
+ // does nothing different than flush
+ match self.poll_flush(_cx) {
+ Poll::Ready(_) => Poll::Ready(Ok(())),
+ Poll::Pending => Poll::Pending,
+ }
Review Comment:
```suggestion
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), io::Error>> {
self.poll_flush(cx)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]