This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 9932bd4a4 feat(services/swift): add SLO multipart upload for large 
objects (#7212)
9932bd4a4 is described below

commit 9932bd4a4762f16afdf42797513a2bb4ac881e61
Author: Ben Roeder <[email protected]>
AuthorDate: Sun Feb 22 17:33:25 2026 -0800

    feat(services/swift): add SLO multipart upload for large objects (#7212)
    
    feat(services/swift): support SLO multipart upload via MultipartWrite
    
    Implement oio::MultipartWrite for SwiftWriter using Swift's Static
    Large Object (SLO) mechanism, removing the 5GB single-upload ceiling.
    
    SLO flow:
    - initiate_part: generate a local UUID (no server call needed)
    - write_part: PUT segment to .segments/{path}/{upload_id}/{part:08}
    - complete_part: PUT JSON manifest to {path}?multipart-manifest=put
    - abort_part: list and delete all segments under the upload_id prefix
    
    Changes:
    - Add swift_put_segment, swift_put_slo_manifest, swift_delete_slo,
      slo_segment_path to SwiftCore
    - Add SloManifestEntry serde struct for manifest JSON
    - Replace OneShotWrite with MultipartWrite on SwiftWriter
    - Track per-part sizes in Arc<Mutex<HashMap>> for manifest assembly
    - Change Writer type from OneShotWriter to MultipartWriter
    - Declare write_can_multi, write_multi_min_size (5MB),
      write_multi_max_size (5GB) capabilities
    - Add uuid dependency for upload ID generation
---
 core/Cargo.lock                                 |   1 +
 core/core/src/raw/oio/write/multipart_write.rs  |   3 +
 core/services/b2/src/writer.rs                  |   1 +
 core/services/cos/src/writer.rs                 |   1 +
 core/services/gcs/src/writer.rs                 |   1 +
 core/services/obs/src/writer.rs                 |   1 +
 core/services/oss/src/writer.rs                 |   1 +
 core/services/s3/src/writer.rs                  |   1 +
 core/services/swift/Cargo.toml                  |   1 +
 core/services/swift/src/backend.rs              |  13 +-
 core/services/swift/src/core.rs                 | 161 ++++++++++++++++++++++++
 core/services/swift/src/writer.rs               |  86 ++++++++++++-
 core/services/upyun/src/writer.rs               |   1 +
 core/services/vercel-blob/src/writer.rs         |   1 +
 integrations/object_store/src/service/writer.rs |   1 +
 15 files changed, 269 insertions(+), 5 deletions(-)

diff --git a/core/Cargo.lock b/core/Cargo.lock
index b4359c518..2c47306c7 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -7051,6 +7051,7 @@ dependencies = [
  "serde",
  "serde_json",
  "tokio",
+ "uuid",
 ]
 
 [[package]]
diff --git a/core/core/src/raw/oio/write/multipart_write.rs 
b/core/core/src/raw/oio/write/multipart_write.rs
index e8b174b18..6ef1ca886 100644
--- a/core/core/src/raw/oio/write/multipart_write.rs
+++ b/core/core/src/raw/oio/write/multipart_write.rs
@@ -118,6 +118,8 @@ pub struct MultipartPart {
     pub etag: String,
     /// The checksum of the part.
     pub checksum: Option<String>,
+    /// The size of the part in bytes.
+    pub size: Option<u64>,
 }
 
 struct WriteInput<W: MultipartWrite> {
@@ -391,6 +393,7 @@ mod tests {
                 part_number,
                 etag: "etag".to_string(),
                 checksum: None,
+                size: None,
             })
         }
 
diff --git a/core/services/b2/src/writer.rs b/core/services/b2/src/writer.rs
index e924a12e1..a3809d0ed 100644
--- a/core/services/b2/src/writer.rs
+++ b/core/services/b2/src/writer.rs
@@ -133,6 +133,7 @@ impl oio::MultipartWrite for B2Writer {
                     etag: result.content_sha1,
                     part_number,
                     checksum: None,
+                    size: None,
                 })
             }
             _ => Err(parse_error(resp)),
diff --git a/core/services/cos/src/writer.rs b/core/services/cos/src/writer.rs
index 0e7348aee..a94986f6c 100644
--- a/core/services/cos/src/writer.rs
+++ b/core/services/cos/src/writer.rs
@@ -141,6 +141,7 @@ impl oio::MultipartWrite for CosWriter {
                     part_number,
                     etag,
                     checksum: None,
+                    size: None,
                 })
             }
             _ => Err(parse_error(resp)),
diff --git a/core/services/gcs/src/writer.rs b/core/services/gcs/src/writer.rs
index 938235355..b8705527e 100644
--- a/core/services/gcs/src/writer.rs
+++ b/core/services/gcs/src/writer.rs
@@ -119,6 +119,7 @@ impl oio::MultipartWrite for GcsWriter {
             part_number,
             etag,
             checksum: None,
+            size: None,
         })
     }
 
diff --git a/core/services/obs/src/writer.rs b/core/services/obs/src/writer.rs
index 6deaf6462..22657d2a4 100644
--- a/core/services/obs/src/writer.rs
+++ b/core/services/obs/src/writer.rs
@@ -136,6 +136,7 @@ impl oio::MultipartWrite for ObsWriter {
                     part_number,
                     etag,
                     checksum: None,
+                    size: None,
                 })
             }
             _ => Err(parse_error(resp)),
diff --git a/core/services/oss/src/writer.rs b/core/services/oss/src/writer.rs
index 7a71a9a08..c03e873dc 100644
--- a/core/services/oss/src/writer.rs
+++ b/core/services/oss/src/writer.rs
@@ -138,6 +138,7 @@ impl oio::MultipartWrite for OssWriter {
                     part_number,
                     etag,
                     checksum: None,
+                    size: None,
                 })
             }
             _ => Err(parse_error(resp)),
diff --git a/core/services/s3/src/writer.rs b/core/services/s3/src/writer.rs
index 5a01e9237..a377130e8 100644
--- a/core/services/s3/src/writer.rs
+++ b/core/services/s3/src/writer.rs
@@ -143,6 +143,7 @@ impl oio::MultipartWrite for S3Writer {
                     part_number,
                     etag,
                     checksum,
+                    size: None,
                 })
             }
             _ => Err(parse_error(resp)),
diff --git a/core/services/swift/Cargo.toml b/core/services/swift/Cargo.toml
index eb2914e0b..0d5b9b02c 100644
--- a/core/services/swift/Cargo.toml
+++ b/core/services/swift/Cargo.toml
@@ -38,6 +38,7 @@ opendal-core = { path = "../../core", version = "0.55.0", 
default-features = fal
 quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
 serde = { workspace = true, features = ["derive"] }
 serde_json = { workspace = true }
+uuid = { workspace = true, features = ["v4"] }
 
 [dev-dependencies]
 tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
diff --git a/core/services/swift/src/backend.rs 
b/core/services/swift/src/backend.rs
index 5d5169629..1a6ad309b 100644
--- a/core/services/swift/src/backend.rs
+++ b/core/services/swift/src/backend.rs
@@ -157,6 +157,13 @@ impl Builder for SwiftBuilder {
 
                             write: true,
                             write_can_empty: true,
+                            write_can_multi: true,
+                            write_multi_min_size: Some(5 * 1024 * 1024),
+                            write_multi_max_size: if cfg!(target_pointer_width 
= "64") {
+                                Some(5 * 1024 * 1024 * 1024)
+                            } else {
+                                Some(usize::MAX)
+                            },
                             write_with_content_type: true,
                             write_with_content_disposition: true,
                             write_with_content_encoding: true,
@@ -194,7 +201,7 @@ pub struct SwiftBackend {
 
 impl Access for SwiftBackend {
     type Reader = HttpBody;
-    type Writer = oio::OneShotWriter<SwiftWriter>;
+    type Writer = oio::MultipartWriter<SwiftWriter>;
     type Lister = oio::PageLister<SwiftLister>;
     type Deleter = oio::BatchDeleter<SwiftDeleter>;
 
@@ -236,9 +243,9 @@ impl Access for SwiftBackend {
     }
 
     async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let concurrent = args.concurrent();
         let writer = SwiftWriter::new(self.core.clone(), args.clone(), 
path.to_string());
-
-        let w = oio::OneShotWriter::new(writer);
+        let w = oio::MultipartWriter::new(self.core.info.clone(), writer, 
concurrent);
 
         Ok((RpWrite::default(), w))
     }
diff --git a/core/services/swift/src/core.rs b/core/services/swift/src/core.rs
index b5d702ceb..ccd5d34c5 100644
--- a/core/services/swift/src/core.rs
+++ b/core/services/swift/src/core.rs
@@ -26,6 +26,7 @@ use http::header::IF_MODIFIED_SINCE;
 use http::header::IF_NONE_MATCH;
 use http::header::IF_UNMODIFIED_SINCE;
 use serde::Deserialize;
+use serde::Serialize;
 
 use opendal_core::raw::*;
 use opendal_core::*;
@@ -313,6 +314,153 @@ impl SwiftCore {
 
         self.info.http_client().send(req).await
     }
+
+    /// Build the segment path for an SLO part.
+    ///
+    /// Segments are stored as: 
`.segments/{object_path}/{upload_id}/{part_number:08}`
+    pub fn slo_segment_path(&self, path: &str, upload_id: &str, part_number: 
usize) -> String {
+        let abs = build_abs_path(&self.root, path);
+        format!(
+            ".segments/{}{}/{:08}",
+            abs.trim_end_matches('/'),
+            upload_id,
+            part_number
+        )
+    }
+
+    /// Upload a segment for an SLO multipart upload.
+    ///
+    /// Reference: 
<https://docs.openstack.org/swift/latest/overview_large_objects.html>
+    pub async fn swift_put_segment(
+        &self,
+        path: &str,
+        upload_id: &str,
+        part_number: usize,
+        size: u64,
+        body: Buffer,
+    ) -> Result<Response<Buffer>> {
+        let segment = self.slo_segment_path(path, upload_id, part_number);
+        let url = format!(
+            "{}/{}/{}",
+            &self.endpoint,
+            &self.container,
+            percent_encode_path(&segment)
+        );
+
+        let mut req = Request::put(&url);
+        req = req.header("X-Auth-Token", &self.token);
+        req = req.header(header::CONTENT_LENGTH, size);
+
+        let req = req
+            .extension(Operation::Write)
+            .body(body)
+            .map_err(new_request_build_error)?;
+
+        self.info.http_client().send(req).await
+    }
+
+    /// Finalize an SLO by uploading the manifest.
+    ///
+    /// PUT {container}/{path}?multipart-manifest=put with a JSON body listing
+    /// each segment's path, etag, and size.
+    ///
+    /// Reference: 
<https://docs.openstack.org/swift/latest/overview_large_objects.html>
+    pub async fn swift_put_slo_manifest(
+        &self,
+        path: &str,
+        manifest: &[SloManifestEntry],
+        args: &OpWrite,
+    ) -> Result<Response<Buffer>> {
+        let abs = build_abs_path(&self.root, path);
+        let url = format!(
+            "{}/{}/{}?multipart-manifest=put",
+            &self.endpoint,
+            &self.container,
+            percent_encode_path(&abs)
+        );
+
+        let body = 
serde_json::to_vec(manifest).map_err(new_json_serialize_error)?;
+
+        let mut req = Request::put(&url);
+        req = req.header("X-Auth-Token", &self.token);
+        req = req.header(header::CONTENT_LENGTH, body.len());
+        req = req.header(header::CONTENT_TYPE, "application/json");
+
+        // Forward user metadata to the manifest object.
+        if let Some(user_metadata) = args.user_metadata() {
+            for (k, v) in user_metadata {
+                req = req.header(format!("X-Object-Meta-{k}"), v);
+            }
+        }
+
+        let req = req
+            .extension(Operation::Write)
+            .body(Buffer::from(bytes::Bytes::from(body)))
+            .map_err(new_request_build_error)?;
+
+        self.info.http_client().send(req).await
+    }
+
+    /// Delete an SLO manifest and all its segments.
+    ///
+    /// DELETE {container}/{path}?multipart-manifest=delete removes the 
manifest
+    /// and all referenced segments in one call.
+    ///
+    /// Reference: 
<https://docs.openstack.org/swift/latest/overview_large_objects.html>
+    pub async fn swift_delete_slo(&self, path: &str, upload_id: &str) -> 
Result<()> {
+        // List segments under the upload_id prefix and delete them 
individually.
+        // We can't use multipart-manifest=delete because we haven't created
+        // the manifest yet (abort happens before complete).
+        let abs = build_abs_path(&self.root, path);
+        let prefix = format!(".segments/{}{}/", abs.trim_end_matches('/'), 
upload_id);
+
+        // List all segments with this prefix.
+        let url = QueryPairsWriter::new(&format!("{}/{}/", &self.endpoint, 
&self.container))
+            .push("prefix", &percent_encode_path(&prefix))
+            .push("format", "json")
+            .finish();
+
+        let mut req = Request::get(&url);
+        req = req.header("X-Auth-Token", &self.token);
+
+        let req = req
+            .extension(Operation::List)
+            .body(Buffer::new())
+            .map_err(new_request_build_error)?;
+
+        let resp = self.info.http_client().send(req).await?;
+        if !resp.status().is_success() {
+            return Ok(());
+        }
+
+        let bs = resp.into_body().to_bytes();
+        let segments: Vec<ListOpResponse> = 
serde_json::from_slice(&bs).unwrap_or_default();
+
+        // Delete each segment.
+        for seg in segments {
+            if let ListOpResponse::FileInfo { name, .. } = seg {
+                let seg_url = format!(
+                    "{}/{}/{}",
+                    &self.endpoint,
+                    &self.container,
+                    percent_encode_path(&name)
+                );
+
+                let mut req = Request::delete(&seg_url);
+                req = req.header("X-Auth-Token", &self.token);
+
+                let req = req
+                    .extension(Operation::Delete)
+                    .body(Buffer::new())
+                    .map_err(new_request_build_error)?;
+
+                // Best effort — ignore individual segment delete failures.
+                let _ = self.info.http_client().send(req).await;
+            }
+        }
+
+        Ok(())
+    }
 }
 
 #[derive(Debug, Eq, PartialEq, Deserialize)]
@@ -354,6 +502,19 @@ pub struct BulkDeleteResponse {
     pub response_body: Option<String>,
 }
 
+/// Entry in an SLO manifest JSON array.
+///
+/// Reference: 
<https://docs.openstack.org/swift/latest/overview_large_objects.html>
+#[derive(Debug, Serialize)]
+pub struct SloManifestEntry {
+    /// Path to the segment: `/{container}/{segment_name}`
+    pub path: String,
+    /// MD5 etag of the segment (without quotes).
+    pub etag: String,
+    /// Size of the segment in bytes.
+    pub size_bytes: u64,
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
diff --git a/core/services/swift/src/writer.rs 
b/core/services/swift/src/writer.rs
index 3f8c63864..5c89a6f0e 100644
--- a/core/services/swift/src/writer.rs
+++ b/core/services/swift/src/writer.rs
@@ -19,6 +19,7 @@ use std::sync::Arc;
 
 use http::StatusCode;
 
+use super::core::SloManifestEntry;
 use super::core::SwiftCore;
 use super::error::parse_error;
 use opendal_core::raw::*;
@@ -50,8 +51,8 @@ impl SwiftWriter {
     }
 }
 
-impl oio::OneShotWrite for SwiftWriter {
-    async fn write_once(&self, bs: Buffer) -> Result<Metadata> {
+impl oio::MultipartWrite for SwiftWriter {
+    async fn write_once(&self, _size: u64, bs: Buffer) -> Result<Metadata> {
         let resp = self
             .core
             .swift_create_object(&self.path, bs.len() as u64, &self.op, bs)
@@ -67,4 +68,85 @@ impl oio::OneShotWrite for SwiftWriter {
             _ => Err(parse_error(resp)),
         }
     }
+
+    async fn initiate_part(&self) -> Result<String> {
+        // Swift SLO doesn't need a server-side initiate call.
+        // Generate a local UUID as the upload ID to namespace the segments.
+        Ok(uuid::Uuid::new_v4().to_string())
+    }
+
+    async fn write_part(
+        &self,
+        upload_id: &str,
+        part_number: usize,
+        size: u64,
+        body: Buffer,
+    ) -> Result<oio::MultipartPart> {
+        let resp = self
+            .core
+            .swift_put_segment(&self.path, upload_id, part_number, size, body)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::CREATED | StatusCode::OK => {
+                let etag = parse_etag(resp.headers())?
+                    .ok_or_else(|| {
+                        Error::new(
+                            ErrorKind::Unexpected,
+                            "ETag not present in segment upload response",
+                        )
+                    })?
+                    .to_string();
+
+                Ok(oio::MultipartPart {
+                    part_number,
+                    etag,
+                    checksum: None,
+                    size: Some(size),
+                })
+            }
+            _ => Err(parse_error(resp)),
+        }
+    }
+
+    async fn complete_part(
+        &self,
+        upload_id: &str,
+        parts: &[oio::MultipartPart],
+    ) -> Result<Metadata> {
+        let manifest: Vec<SloManifestEntry> = parts
+            .iter()
+            .map(|part| {
+                let segment = self
+                    .core
+                    .slo_segment_path(&self.path, upload_id, part.part_number);
+                SloManifestEntry {
+                    path: format!("{}/{}", &self.core.container, segment),
+                    etag: part.etag.trim_matches('"').to_string(),
+                    size_bytes: part.size.unwrap_or(0),
+                }
+            })
+            .collect();
+
+        let resp = self
+            .core
+            .swift_put_slo_manifest(&self.path, &manifest, &self.op)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::CREATED | StatusCode::OK => {
+                let metadata = SwiftWriter::parse_metadata(resp.headers())?;
+                Ok(metadata)
+            }
+            _ => Err(parse_error(resp)),
+        }
+    }
+
+    async fn abort_part(&self, upload_id: &str) -> Result<()> {
+        self.core.swift_delete_slo(&self.path, upload_id).await
+    }
 }
diff --git a/core/services/upyun/src/writer.rs 
b/core/services/upyun/src/writer.rs
index 82a36bb20..fb6ea77f1 100644
--- a/core/services/upyun/src/writer.rs
+++ b/core/services/upyun/src/writer.rs
@@ -95,6 +95,7 @@ impl oio::MultipartWrite for UpyunWriter {
                 part_number,
                 etag: "".to_string(),
                 checksum: None,
+                size: None,
             }),
             _ => Err(parse_error(resp)),
         }
diff --git a/core/services/vercel-blob/src/writer.rs 
b/core/services/vercel-blob/src/writer.rs
index b4bc8eefd..ee602a457 100644
--- a/core/services/vercel-blob/src/writer.rs
+++ b/core/services/vercel-blob/src/writer.rs
@@ -102,6 +102,7 @@ impl oio::MultipartWrite for VercelBlobWriter {
                     part_number,
                     etag: resp.etag,
                     checksum: None,
+                    size: None,
                 })
             }
             _ => Err(parse_error(resp)),
diff --git a/integrations/object_store/src/service/writer.rs 
b/integrations/object_store/src/service/writer.rs
index 9b6c07dc6..4b3c3437a 100644
--- a/integrations/object_store/src/service/writer.rs
+++ b/integrations/object_store/src/service/writer.rs
@@ -156,6 +156,7 @@ impl oio::MultipartWrite for ObjectStoreWriter {
             part_number,
             etag,
             checksum: None, // No checksum for now
+            size: None,
         };
         Ok(multipart_part)
     }

Reply via email to