jiacai2050 commented on code in PR #1541:
URL: https://github.com/apache/horaedb/pull/1541#discussion_r1677287676
##########
src/analytic_engine/src/sst/parquet/writer.rs:
##########
@@ -405,67 +411,129 @@ impl<'a> RecordBatchGroupWriter<'a> {
}
}
-struct ObjectStoreMultiUploadAborter<'a> {
- location: &'a Path,
- session_id: String,
- object_store: &'a ObjectStoreRef,
+struct ObjectStoreMultiUpload {
+ multi_upload: MultipartRef,
+ tasks: FuturesUnordered<UploadPart>,
+ completion_task: Option<BoxFuture<'static, std::result::Result<(),
Error>>>,
}
-impl<'a> ObjectStoreMultiUploadAborter<'a> {
- async fn initialize_upload(
- object_store: &'a ObjectStoreRef,
- location: &'a Path,
- ) -> Result<(
- ObjectStoreMultiUploadAborter<'a>,
- Box<dyn AsyncWrite + Unpin + Send>,
- )> {
- let (session_id, upload_writer) = object_store
+impl<'a> ObjectStoreMultiUpload {
+ async fn new(object_store: &'a ObjectStoreRef, location: &'a Path) ->
Result<Self> {
+ let upload_writer = object_store
.put_multipart(location)
.await
.context(Storage)?;
- let aborter = Self {
- location,
- session_id,
- object_store,
+
+ let multi_upload = Self {
+ multi_upload: Arc::new(Mutex::new(upload_writer)),
+ tasks: FuturesUnordered::new(),
+ completion_task: None,
};
- Ok((aborter, upload_writer))
+
+ Ok(multi_upload)
}
- async fn abort(self) -> Result<()> {
- self.object_store
- .abort_multipart(self.location, &self.session_id)
- .await
- .context(Storage)
+ pub fn aborter(&self) -> MultipartRef {
+ self.multi_upload.clone()
+ }
+
+ pub fn poll_tasks(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::result::Result<(), object_store::ObjectStoreError> {
+ if self.tasks.is_empty() {
+ return Ok(());
+ }
+ while let Poll::Ready(Some(res)) = self.tasks.poll_next_unpin(cx) {
+ res?;
+ }
+ Ok(())
}
}
-async fn write_metadata<W>(
- mut meta_sink: W,
- parquet_metadata: ParquetMetaData,
- meta_path: &object_store::Path,
-) -> writer::Result<usize>
-where
- W: AsyncWrite + Send + Unpin,
-{
- let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
- let bytes = buf.as_bytes();
- let bytes_size = bytes.len();
- meta_sink.write_all(bytes).await.with_context(|| Io {
- file: meta_path.clone(),
- })?;
+impl AsyncWrite for ObjectStoreMultiUpload {
+ // TODO: Currently,the data writing is serial, and data may need to be
written
Review Comment:
In older version, the write process is parallel?
If it's, we need to implement it as well.
##########
src/components/object_store/src/obkv/meta.rs:
##########
@@ -113,13 +116,9 @@ pub struct ObkvObjectMeta {
/// table_name @ path @ upload_id
#[serde(rename = "unique_id")]
pub unique_id: Option<String>,
- /// The size in bytes of one part. Note: maybe the size of last part less
- /// than part_size.
- #[serde(rename = "part_size")]
- pub part_size: usize,
- /// The paths of multi upload parts.
+ /// The paths and size of multi upload parts.
#[serde(rename = "parts")]
- pub parts: Vec<String>,
+ pub parts: Vec<(String, usize)>,
Review Comment:
This struct is serialized, so it's breaking changes to update its fields?
If it's, we can define a proto to serialize this struct instead of using
serde.
##########
src/analytic_engine/src/sst/parquet/writer.rs:
##########
@@ -405,67 +411,129 @@ impl<'a> RecordBatchGroupWriter<'a> {
}
}
-struct ObjectStoreMultiUploadAborter<'a> {
- location: &'a Path,
- session_id: String,
- object_store: &'a ObjectStoreRef,
+struct ObjectStoreMultiUpload {
+ multi_upload: MultipartRef,
+ tasks: FuturesUnordered<UploadPart>,
+ completion_task: Option<BoxFuture<'static, std::result::Result<(),
Error>>>,
}
-impl<'a> ObjectStoreMultiUploadAborter<'a> {
- async fn initialize_upload(
- object_store: &'a ObjectStoreRef,
- location: &'a Path,
- ) -> Result<(
- ObjectStoreMultiUploadAborter<'a>,
- Box<dyn AsyncWrite + Unpin + Send>,
- )> {
- let (session_id, upload_writer) = object_store
+impl<'a> ObjectStoreMultiUpload {
+ async fn new(object_store: &'a ObjectStoreRef, location: &'a Path) ->
Result<Self> {
+ let upload_writer = object_store
.put_multipart(location)
.await
.context(Storage)?;
- let aborter = Self {
- location,
- session_id,
- object_store,
+
+ let multi_upload = Self {
+ multi_upload: Arc::new(Mutex::new(upload_writer)),
+ tasks: FuturesUnordered::new(),
+ completion_task: None,
};
- Ok((aborter, upload_writer))
+
+ Ok(multi_upload)
}
- async fn abort(self) -> Result<()> {
- self.object_store
- .abort_multipart(self.location, &self.session_id)
- .await
- .context(Storage)
+ pub fn aborter(&self) -> MultipartRef {
+ self.multi_upload.clone()
+ }
+
+ pub fn poll_tasks(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::result::Result<(), object_store::ObjectStoreError> {
+ if self.tasks.is_empty() {
+ return Ok(());
+ }
+ while let Poll::Ready(Some(res)) = self.tasks.poll_next_unpin(cx) {
+ res?;
+ }
+ Ok(())
}
}
-async fn write_metadata<W>(
- mut meta_sink: W,
- parquet_metadata: ParquetMetaData,
- meta_path: &object_store::Path,
-) -> writer::Result<usize>
-where
- W: AsyncWrite + Send + Unpin,
-{
- let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
- let bytes = buf.as_bytes();
- let bytes_size = bytes.len();
- meta_sink.write_all(bytes).await.with_context(|| Io {
- file: meta_path.clone(),
- })?;
+impl AsyncWrite for ObjectStoreMultiUpload {
+ // TODO: Currently,the data writing is serial, and data may need to be
written
+ // concurrently.
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> Poll<std::result::Result<usize, Error>> {
+ let buf_size = buf.len();
+ let multi_upload = self.multi_upload.clone();
+
+ let buf = buf.to_vec();
+ let task = async move {
multi_upload.lock().await.put_part(buf.into()).await };
+ self.as_mut().tasks.push(Box::pin(task));
+
+ self.as_mut().poll_tasks(cx)?;
+
+ Poll::Ready(Ok(buf_size))
+ }
+
+ fn poll_flush(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ self.as_mut().poll_tasks(cx)?;
+
+ if self.tasks.is_empty() {
+ return Poll::Ready(Ok(()));
+ }
+ Poll::Pending
+ }
+
+ fn poll_shutdown(
+ mut self: Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<std::result::Result<(), Error>> {
+ self.as_mut().poll_tasks(cx)?;
+
+ if !self.tasks.is_empty() {
+ return Poll::Pending;
+ }
- meta_sink.shutdown().await.with_context(|| Io {
- file: meta_path.clone(),
- })?;
+ let multi_upload = self.multi_upload.clone();
- Ok(bytes_size)
+ let completion_task = self.completion_task.get_or_insert_with(|| {
+ Box::pin(async move {
+ multi_upload.lock().await.complete().await?;
+ Ok(())
+ })
+ });
+
+ Pin::new(completion_task).poll(cx)
+ }
+}
+
+async fn write_metadata(
+ meta_sink: ObjectStoreMultiUpload,
+ parquet_metadata: ParquetMetaData,
+) -> Result<usize> {
+ let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
+ let buf_size = buf.len();
+ meta_sink
+ .multi_upload
+ .lock()
+ .await
+ .put_part(buf.into())
Review Comment:
No need to use part_part, we can just write metadata in one `put`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]