jiacai2050 commented on code in PR #1541:
URL: https://github.com/apache/horaedb/pull/1541#discussion_r1721167465


##########
src/analytic_engine/src/sst/parquet/writer.rs:
##########
@@ -405,67 +414,116 @@ impl<'a> RecordBatchGroupWriter<'a> {
     }
 }
 
-struct ObjectStoreMultiUploadAborter<'a> {
-    location: &'a Path,
-    session_id: String,
-    object_store: &'a ObjectStoreRef,
+struct ObjectStoreMultiUpload {
+    multi_upload: WriteMultipartRef,
+    upload_task: Option<BoxFuture<'static, std::result::Result<usize, Error>>>,
+    flush_task: Option<BoxFuture<'static, std::result::Result<(), Error>>>,
+    completion_task: Option<BoxFuture<'static, std::result::Result<(), 
Error>>>,
 }
 
-impl<'a> ObjectStoreMultiUploadAborter<'a> {
-    async fn initialize_upload(
-        object_store: &'a ObjectStoreRef,
-        location: &'a Path,
-    ) -> Result<(
-        ObjectStoreMultiUploadAborter<'a>,
-        Box<dyn AsyncWrite + Unpin + Send>,
-    )> {
-        let (session_id, upload_writer) = object_store
+impl<'a> ObjectStoreMultiUpload {
+    async fn new(object_store: &'a ObjectStoreRef, location: &'a Path) -> 
Result<Self> {
+        let upload_writer = object_store
             .put_multipart(location)
             .await
             .context(Storage)?;
-        let aborter = Self {
-            location,
-            session_id,
-            object_store,
+
+        let multi_upload = 
Arc::new(Mutex::new(WriteMultipart::new(upload_writer, CHUNK_SIZE)));
+
+        let multi_upload = Self {
+            multi_upload,
+            upload_task: None,
+            flush_task: None,
+            completion_task: None,
         };
-        Ok((aborter, upload_writer))
+
+        Ok(multi_upload)
     }
 
-    async fn abort(self) -> Result<()> {
-        self.object_store
-            .abort_multipart(self.location, &self.session_id)
-            .await
-            .context(Storage)
+    pub fn aborter(&self) -> WriteMultipartRef {
+        self.multi_upload.clone()
     }
 }
 
-async fn write_metadata<W>(
-    mut meta_sink: W,
-    parquet_metadata: ParquetMetaData,
-    meta_path: &object_store::Path,
-) -> writer::Result<usize>
-where
-    W: AsyncWrite + Send + Unpin,
-{
-    let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
-    let bytes = buf.as_bytes();
-    let bytes_size = bytes.len();
-    meta_sink.write_all(bytes).await.with_context(|| Io {
-        file: meta_path.clone(),
-    })?;
+impl AsyncWrite for ObjectStoreMultiUpload {
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> Poll<std::result::Result<usize, Error>> {
+        let multi_upload = self.multi_upload.clone();
+        let buf = buf.to_owned();
+
+        let upload_task = self.upload_task.insert(
+            async move {
+                multi_upload.lock().await.flush(MAX_CONCURRENCY).await?;
+                multi_upload.lock().await.write(&buf);
+                Ok(buf.len())
+            }
+            .boxed(),
+        );
+
+        Pin::new(upload_task).poll(cx)
+    }
+
+    fn poll_flush(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        let multi_upload = self.multi_upload.clone();
 
-    meta_sink.shutdown().await.with_context(|| Io {
-        file: meta_path.clone(),
-    })?;
+        let flush_task = self.flush_task.insert(
+            async move {
+                multi_upload.lock().await.flush(0).await?;
+                Ok(())
+            }
+            .boxed(),
+        );
+
+        Pin::new(flush_task).poll(cx)
+    }
 
-    Ok(bytes_size)
+    fn poll_shutdown(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        let multi_upload = self.multi_upload.clone();
+
+        let completion_task = self.completion_task.get_or_insert_with(|| {
+            async move {
+                multi_upload.lock().await.finish().await?;
+                Ok(())
+            }
+            .boxed()
+        });
+
+        Pin::new(completion_task).poll(cx)
+    }
+}
+
+async fn write_metadata(
+    meta_sink: ObjectStoreMultiUpload,
+    parquet_metadata: ParquetMetaData,
+) -> Result<usize> {
+    let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
+    let buf_size = buf.len();
+    meta_sink.multi_upload.lock().await.put(buf);

Review Comment:
   It seems we can only lock once here.
   ```
   let upload = meta_sink.multi_upload.lock().await;
   upload.put();
   upload.finish();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to