This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 9932bd4a4 feat(services/swift): add SLO multipart upload for large
objects (#7212)
9932bd4a4 is described below
commit 9932bd4a4762f16afdf42797513a2bb4ac881e61
Author: Ben Roeder <[email protected]>
AuthorDate: Sun Feb 22 17:33:25 2026 -0800
feat(services/swift): add SLO multipart upload for large objects (#7212)
feat(services/swift): support SLO multipart upload via MultipartWrite
Implement oio::MultipartWrite for SwiftWriter using Swift's Static
Large Object (SLO) mechanism, removing the 5GB single-upload ceiling.
SLO flow:
- initiate_part: generate a local UUID (no server call needed)
- write_part: PUT segment to .segments/{path}/{upload_id}/{part:08}
- complete_part: PUT JSON manifest to {path}?multipart-manifest=put
- abort_part: list and delete all segments under the upload_id prefix
Changes:
- Add swift_put_segment, swift_put_slo_manifest, swift_delete_slo,
slo_segment_path to SwiftCore
- Add SloManifestEntry serde struct for manifest JSON
- Replace OneShotWrite with MultipartWrite on SwiftWriter
- Track per-part sizes in Arc<Mutex<HashMap>> for manifest assembly
- Change Writer type from OneShotWriter to MultipartWriter
- Declare write_can_multi, write_multi_min_size (5MB),
write_multi_max_size (5GB) capabilities
- Add uuid dependency for upload ID generation
---
core/Cargo.lock | 1 +
core/core/src/raw/oio/write/multipart_write.rs | 3 +
core/services/b2/src/writer.rs | 1 +
core/services/cos/src/writer.rs | 1 +
core/services/gcs/src/writer.rs | 1 +
core/services/obs/src/writer.rs | 1 +
core/services/oss/src/writer.rs | 1 +
core/services/s3/src/writer.rs | 1 +
core/services/swift/Cargo.toml | 1 +
core/services/swift/src/backend.rs | 13 +-
core/services/swift/src/core.rs | 161 ++++++++++++++++++++++++
core/services/swift/src/writer.rs | 86 ++++++++++++-
core/services/upyun/src/writer.rs | 1 +
core/services/vercel-blob/src/writer.rs | 1 +
integrations/object_store/src/service/writer.rs | 1 +
15 files changed, 269 insertions(+), 5 deletions(-)
diff --git a/core/Cargo.lock b/core/Cargo.lock
index b4359c518..2c47306c7 100644
--- a/core/Cargo.lock
+++ b/core/Cargo.lock
@@ -7051,6 +7051,7 @@ dependencies = [
"serde",
"serde_json",
"tokio",
+ "uuid",
]
[[package]]
diff --git a/core/core/src/raw/oio/write/multipart_write.rs
b/core/core/src/raw/oio/write/multipart_write.rs
index e8b174b18..6ef1ca886 100644
--- a/core/core/src/raw/oio/write/multipart_write.rs
+++ b/core/core/src/raw/oio/write/multipart_write.rs
@@ -118,6 +118,8 @@ pub struct MultipartPart {
pub etag: String,
/// The checksum of the part.
pub checksum: Option<String>,
+ /// The size of the part in bytes.
+ pub size: Option<u64>,
}
struct WriteInput<W: MultipartWrite> {
@@ -391,6 +393,7 @@ mod tests {
part_number,
etag: "etag".to_string(),
checksum: None,
+ size: None,
})
}
diff --git a/core/services/b2/src/writer.rs b/core/services/b2/src/writer.rs
index e924a12e1..a3809d0ed 100644
--- a/core/services/b2/src/writer.rs
+++ b/core/services/b2/src/writer.rs
@@ -133,6 +133,7 @@ impl oio::MultipartWrite for B2Writer {
etag: result.content_sha1,
part_number,
checksum: None,
+ size: None,
})
}
_ => Err(parse_error(resp)),
diff --git a/core/services/cos/src/writer.rs b/core/services/cos/src/writer.rs
index 0e7348aee..a94986f6c 100644
--- a/core/services/cos/src/writer.rs
+++ b/core/services/cos/src/writer.rs
@@ -141,6 +141,7 @@ impl oio::MultipartWrite for CosWriter {
part_number,
etag,
checksum: None,
+ size: None,
})
}
_ => Err(parse_error(resp)),
diff --git a/core/services/gcs/src/writer.rs b/core/services/gcs/src/writer.rs
index 938235355..b8705527e 100644
--- a/core/services/gcs/src/writer.rs
+++ b/core/services/gcs/src/writer.rs
@@ -119,6 +119,7 @@ impl oio::MultipartWrite for GcsWriter {
part_number,
etag,
checksum: None,
+ size: None,
})
}
diff --git a/core/services/obs/src/writer.rs b/core/services/obs/src/writer.rs
index 6deaf6462..22657d2a4 100644
--- a/core/services/obs/src/writer.rs
+++ b/core/services/obs/src/writer.rs
@@ -136,6 +136,7 @@ impl oio::MultipartWrite for ObsWriter {
part_number,
etag,
checksum: None,
+ size: None,
})
}
_ => Err(parse_error(resp)),
diff --git a/core/services/oss/src/writer.rs b/core/services/oss/src/writer.rs
index 7a71a9a08..c03e873dc 100644
--- a/core/services/oss/src/writer.rs
+++ b/core/services/oss/src/writer.rs
@@ -138,6 +138,7 @@ impl oio::MultipartWrite for OssWriter {
part_number,
etag,
checksum: None,
+ size: None,
})
}
_ => Err(parse_error(resp)),
diff --git a/core/services/s3/src/writer.rs b/core/services/s3/src/writer.rs
index 5a01e9237..a377130e8 100644
--- a/core/services/s3/src/writer.rs
+++ b/core/services/s3/src/writer.rs
@@ -143,6 +143,7 @@ impl oio::MultipartWrite for S3Writer {
part_number,
etag,
checksum,
+ size: None,
})
}
_ => Err(parse_error(resp)),
diff --git a/core/services/swift/Cargo.toml b/core/services/swift/Cargo.toml
index eb2914e0b..0d5b9b02c 100644
--- a/core/services/swift/Cargo.toml
+++ b/core/services/swift/Cargo.toml
@@ -38,6 +38,7 @@ opendal-core = { path = "../../core", version = "0.55.0",
default-features = fal
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
+uuid = { workspace = true, features = ["v4"] }
[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
diff --git a/core/services/swift/src/backend.rs
b/core/services/swift/src/backend.rs
index 5d5169629..1a6ad309b 100644
--- a/core/services/swift/src/backend.rs
+++ b/core/services/swift/src/backend.rs
@@ -157,6 +157,13 @@ impl Builder for SwiftBuilder {
write: true,
write_can_empty: true,
+ write_can_multi: true,
+ write_multi_min_size: Some(5 * 1024 * 1024),
+ write_multi_max_size: if cfg!(target_pointer_width
= "64") {
+ Some(5 * 1024 * 1024 * 1024)
+ } else {
+ Some(usize::MAX)
+ },
write_with_content_type: true,
write_with_content_disposition: true,
write_with_content_encoding: true,
@@ -194,7 +201,7 @@ pub struct SwiftBackend {
impl Access for SwiftBackend {
type Reader = HttpBody;
- type Writer = oio::OneShotWriter<SwiftWriter>;
+ type Writer = oio::MultipartWriter<SwiftWriter>;
type Lister = oio::PageLister<SwiftLister>;
type Deleter = oio::BatchDeleter<SwiftDeleter>;
@@ -236,9 +243,9 @@ impl Access for SwiftBackend {
}
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let concurrent = args.concurrent();
let writer = SwiftWriter::new(self.core.clone(), args.clone(),
path.to_string());
-
- let w = oio::OneShotWriter::new(writer);
+ let w = oio::MultipartWriter::new(self.core.info.clone(), writer,
concurrent);
Ok((RpWrite::default(), w))
}
diff --git a/core/services/swift/src/core.rs b/core/services/swift/src/core.rs
index b5d702ceb..ccd5d34c5 100644
--- a/core/services/swift/src/core.rs
+++ b/core/services/swift/src/core.rs
@@ -26,6 +26,7 @@ use http::header::IF_MODIFIED_SINCE;
use http::header::IF_NONE_MATCH;
use http::header::IF_UNMODIFIED_SINCE;
use serde::Deserialize;
+use serde::Serialize;
use opendal_core::raw::*;
use opendal_core::*;
@@ -313,6 +314,153 @@ impl SwiftCore {
self.info.http_client().send(req).await
}
+
+ /// Build the segment path for an SLO part.
+ ///
+ /// Segments are stored as:
`.segments/{object_path}/{upload_id}/{part_number:08}`
+ pub fn slo_segment_path(&self, path: &str, upload_id: &str, part_number:
usize) -> String {
+ let abs = build_abs_path(&self.root, path);
+ format!(
+ ".segments/{}{}/{:08}",
+ abs.trim_end_matches('/'),
+ upload_id,
+ part_number
+ )
+ }
+
+ /// Upload a segment for an SLO multipart upload.
+ ///
+ /// Reference:
<https://docs.openstack.org/swift/latest/overview_large_objects.html>
+ pub async fn swift_put_segment(
+ &self,
+ path: &str,
+ upload_id: &str,
+ part_number: usize,
+ size: u64,
+ body: Buffer,
+ ) -> Result<Response<Buffer>> {
+ let segment = self.slo_segment_path(path, upload_id, part_number);
+ let url = format!(
+ "{}/{}/{}",
+ &self.endpoint,
+ &self.container,
+ percent_encode_path(&segment)
+ );
+
+ let mut req = Request::put(&url);
+ req = req.header("X-Auth-Token", &self.token);
+ req = req.header(header::CONTENT_LENGTH, size);
+
+ let req = req
+ .extension(Operation::Write)
+ .body(body)
+ .map_err(new_request_build_error)?;
+
+ self.info.http_client().send(req).await
+ }
+
+ /// Finalize an SLO by uploading the manifest.
+ ///
+ /// PUT {container}/{path}?multipart-manifest=put with a JSON body listing
+ /// each segment's path, etag, and size.
+ ///
+ /// Reference:
<https://docs.openstack.org/swift/latest/overview_large_objects.html>
+ pub async fn swift_put_slo_manifest(
+ &self,
+ path: &str,
+ manifest: &[SloManifestEntry],
+ args: &OpWrite,
+ ) -> Result<Response<Buffer>> {
+ let abs = build_abs_path(&self.root, path);
+ let url = format!(
+ "{}/{}/{}?multipart-manifest=put",
+ &self.endpoint,
+ &self.container,
+ percent_encode_path(&abs)
+ );
+
+ let body =
serde_json::to_vec(manifest).map_err(new_json_serialize_error)?;
+
+ let mut req = Request::put(&url);
+ req = req.header("X-Auth-Token", &self.token);
+ req = req.header(header::CONTENT_LENGTH, body.len());
+ req = req.header(header::CONTENT_TYPE, "application/json");
+
+ // Forward user metadata to the manifest object.
+ if let Some(user_metadata) = args.user_metadata() {
+ for (k, v) in user_metadata {
+ req = req.header(format!("X-Object-Meta-{k}"), v);
+ }
+ }
+
+ let req = req
+ .extension(Operation::Write)
+ .body(Buffer::from(bytes::Bytes::from(body)))
+ .map_err(new_request_build_error)?;
+
+ self.info.http_client().send(req).await
+ }
+
+ /// Delete an SLO manifest and all its segments.
+ ///
+ /// DELETE {container}/{path}?multipart-manifest=delete removes the
manifest
+ /// and all referenced segments in one call.
+ ///
+ /// Reference:
<https://docs.openstack.org/swift/latest/overview_large_objects.html>
+ pub async fn swift_delete_slo(&self, path: &str, upload_id: &str) ->
Result<()> {
+ // List segments under the upload_id prefix and delete them
individually.
+ // We can't use multipart-manifest=delete because we haven't created
+ // the manifest yet (abort happens before complete).
+ let abs = build_abs_path(&self.root, path);
+ let prefix = format!(".segments/{}{}/", abs.trim_end_matches('/'),
upload_id);
+
+ // List all segments with this prefix.
+ let url = QueryPairsWriter::new(&format!("{}/{}/", &self.endpoint,
&self.container))
+ .push("prefix", &percent_encode_path(&prefix))
+ .push("format", "json")
+ .finish();
+
+ let mut req = Request::get(&url);
+ req = req.header("X-Auth-Token", &self.token);
+
+ let req = req
+ .extension(Operation::List)
+ .body(Buffer::new())
+ .map_err(new_request_build_error)?;
+
+ let resp = self.info.http_client().send(req).await?;
+ if !resp.status().is_success() {
+ return Ok(());
+ }
+
+ let bs = resp.into_body().to_bytes();
+ let segments: Vec<ListOpResponse> =
serde_json::from_slice(&bs).unwrap_or_default();
+
+ // Delete each segment.
+ for seg in segments {
+ if let ListOpResponse::FileInfo { name, .. } = seg {
+ let seg_url = format!(
+ "{}/{}/{}",
+ &self.endpoint,
+ &self.container,
+ percent_encode_path(&name)
+ );
+
+ let mut req = Request::delete(&seg_url);
+ req = req.header("X-Auth-Token", &self.token);
+
+ let req = req
+ .extension(Operation::Delete)
+ .body(Buffer::new())
+ .map_err(new_request_build_error)?;
+
+ // Best effort — ignore individual segment delete failures.
+ let _ = self.info.http_client().send(req).await;
+ }
+ }
+
+ Ok(())
+ }
}
#[derive(Debug, Eq, PartialEq, Deserialize)]
@@ -354,6 +502,19 @@ pub struct BulkDeleteResponse {
pub response_body: Option<String>,
}
+/// Entry in an SLO manifest JSON array.
+///
+/// Reference:
<https://docs.openstack.org/swift/latest/overview_large_objects.html>
+#[derive(Debug, Serialize)]
+pub struct SloManifestEntry {
+ /// Path to the segment: `/{container}/{segment_name}`
+ pub path: String,
+ /// MD5 etag of the segment (without quotes).
+ pub etag: String,
+ /// Size of the segment in bytes.
+ pub size_bytes: u64,
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/core/services/swift/src/writer.rs
b/core/services/swift/src/writer.rs
index 3f8c63864..5c89a6f0e 100644
--- a/core/services/swift/src/writer.rs
+++ b/core/services/swift/src/writer.rs
@@ -19,6 +19,7 @@ use std::sync::Arc;
use http::StatusCode;
+use super::core::SloManifestEntry;
use super::core::SwiftCore;
use super::error::parse_error;
use opendal_core::raw::*;
@@ -50,8 +51,8 @@ impl SwiftWriter {
}
}
-impl oio::OneShotWrite for SwiftWriter {
- async fn write_once(&self, bs: Buffer) -> Result<Metadata> {
+impl oio::MultipartWrite for SwiftWriter {
+ async fn write_once(&self, _size: u64, bs: Buffer) -> Result<Metadata> {
let resp = self
.core
.swift_create_object(&self.path, bs.len() as u64, &self.op, bs)
@@ -67,4 +68,85 @@ impl oio::OneShotWrite for SwiftWriter {
_ => Err(parse_error(resp)),
}
}
+
+ async fn initiate_part(&self) -> Result<String> {
+ // Swift SLO doesn't need a server-side initiate call.
+ // Generate a local UUID as the upload ID to namespace the segments.
+ Ok(uuid::Uuid::new_v4().to_string())
+ }
+
+ async fn write_part(
+ &self,
+ upload_id: &str,
+ part_number: usize,
+ size: u64,
+ body: Buffer,
+ ) -> Result<oio::MultipartPart> {
+ let resp = self
+ .core
+ .swift_put_segment(&self.path, upload_id, part_number, size, body)
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::CREATED | StatusCode::OK => {
+ let etag = parse_etag(resp.headers())?
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::Unexpected,
+ "ETag not present in segment upload response",
+ )
+ })?
+ .to_string();
+
+ Ok(oio::MultipartPart {
+ part_number,
+ etag,
+ checksum: None,
+ size: Some(size),
+ })
+ }
+ _ => Err(parse_error(resp)),
+ }
+ }
+
+ async fn complete_part(
+ &self,
+ upload_id: &str,
+ parts: &[oio::MultipartPart],
+ ) -> Result<Metadata> {
+ let manifest: Vec<SloManifestEntry> = parts
+ .iter()
+ .map(|part| {
+ let segment = self
+ .core
+ .slo_segment_path(&self.path, upload_id, part.part_number);
+ SloManifestEntry {
+ path: format!("{}/{}", &self.core.container, segment),
+ etag: part.etag.trim_matches('"').to_string(),
+ size_bytes: part.size.unwrap_or(0),
+ }
+ })
+ .collect();
+
+ let resp = self
+ .core
+ .swift_put_slo_manifest(&self.path, &manifest, &self.op)
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::CREATED | StatusCode::OK => {
+ let metadata = SwiftWriter::parse_metadata(resp.headers())?;
+ Ok(metadata)
+ }
+ _ => Err(parse_error(resp)),
+ }
+ }
+
+ async fn abort_part(&self, upload_id: &str) -> Result<()> {
+ self.core.swift_delete_slo(&self.path, upload_id).await
+ }
}
diff --git a/core/services/upyun/src/writer.rs
b/core/services/upyun/src/writer.rs
index 82a36bb20..fb6ea77f1 100644
--- a/core/services/upyun/src/writer.rs
+++ b/core/services/upyun/src/writer.rs
@@ -95,6 +95,7 @@ impl oio::MultipartWrite for UpyunWriter {
part_number,
etag: "".to_string(),
checksum: None,
+ size: None,
}),
_ => Err(parse_error(resp)),
}
diff --git a/core/services/vercel-blob/src/writer.rs
b/core/services/vercel-blob/src/writer.rs
index b4bc8eefd..ee602a457 100644
--- a/core/services/vercel-blob/src/writer.rs
+++ b/core/services/vercel-blob/src/writer.rs
@@ -102,6 +102,7 @@ impl oio::MultipartWrite for VercelBlobWriter {
part_number,
etag: resp.etag,
checksum: None,
+ size: None,
})
}
_ => Err(parse_error(resp)),
diff --git a/integrations/object_store/src/service/writer.rs
b/integrations/object_store/src/service/writer.rs
index 9b6c07dc6..4b3c3437a 100644
--- a/integrations/object_store/src/service/writer.rs
+++ b/integrations/object_store/src/service/writer.rs
@@ -156,6 +156,7 @@ impl oio::MultipartWrite for ObjectStoreWriter {
part_number,
etag,
checksum: None, // No checksum for now
+ size: None,
};
Ok(multipart_part)
}