This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch stream-based-write
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git

commit 6c175925840d2c96859ec516c9b7ae169f2962d3
Author: Xuanwo <[email protected]>
AuthorDate: Wed Aug 30 18:42:05 2023 +0800

    Migrate ipmfs
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/ipmfs/backend.rs |  4 ++--
 core/src/services/ipmfs/writer.rs  | 17 +++++------------
 2 files changed, 7 insertions(+), 14 deletions(-)

diff --git a/core/src/services/ipmfs/backend.rs 
b/core/src/services/ipmfs/backend.rs
index aebab57e5..c5fa4c7ba 100644
--- a/core/src/services/ipmfs/backend.rs
+++ b/core/src/services/ipmfs/backend.rs
@@ -290,7 +290,7 @@ impl IpmfsBackend {
     pub async fn ipmfs_write(
         &self,
         path: &str,
-        body: Bytes,
+        stream: oio::Streamer,
     ) -> Result<Response<IncomingAsyncBody>> {
         let p = build_rooted_abs_path(&self.root, path);
 
@@ -300,7 +300,7 @@ impl IpmfsBackend {
             percent_encode_path(&p)
         );
 
-        let multipart = 
Multipart::new().part(FormDataPart::new("data").content(body));
+        let multipart = 
Multipart::new().part(FormDataPart::new("data").stream(stream));
 
         let req: http::request::Builder = Request::post(url);
         let req = multipart.apply(req)?;
diff --git a/core/src/services/ipmfs/writer.rs 
b/core/src/services/ipmfs/writer.rs
index 2e2d4c18e..5e5359b37 100644
--- a/core/src/services/ipmfs/writer.rs
+++ b/core/src/services/ipmfs/writer.rs
@@ -34,9 +34,12 @@ impl IpmfsWriter {
     pub fn new(backend: IpmfsBackend, path: String) -> Self {
         IpmfsWriter { backend, path }
     }
+}
 
-    async fn write(&mut self, bs: Bytes) -> Result<()> {
-        let resp = self.backend.ipmfs_write(&self.path, bs).await?;
+#[async_trait]
+impl oio::Write for IpmfsWriter {
+    async fn write(&mut self, s: oio::Streamer) -> Result<()> {
+        let resp = self.backend.ipmfs_write(&self.path, s).await?;
 
         let status = resp.status();
 
@@ -48,16 +51,6 @@ impl IpmfsWriter {
             _ => Err(parse_error(resp).await?),
         }
     }
-}
-
-#[async_trait]
-impl oio::Write for IpmfsWriter {
-    async fn write(&mut self, _s: oio::Streamer) -> Result<()> {
-        Err(Error::new(
-            ErrorKind::Unsupported,
-            "Write::sink is not supported",
-        ))
-    }
 
     async fn abort(&mut self) -> Result<()> {
         Ok(())

Reply via email to