This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 0f6738490 feat(services/lakefs): Implement write returns metadata
(#6770)
0f6738490 is described below
commit 0f6738490dc1d21d0e3db4bd6f48621e676a9f86
Author: Kingsword <[email protected]>
AuthorDate: Mon Nov 10 16:16:45 2025 +0800
feat(services/lakefs): Implement write returns metadata (#6770)
---
core/src/services/lakefs/backend.rs | 15 ++++-----------
core/src/services/lakefs/core.rs | 35 +++++++++++++++++++++++++++++++++++
core/src/services/lakefs/writer.rs | 34 +++++++++++++++++++++++++++++++++-
3 files changed, 72 insertions(+), 12 deletions(-)
diff --git a/core/src/services/lakefs/backend.rs
b/core/src/services/lakefs/backend.rs
index f59cd3dec..c4ffc6b6e 100644
--- a/core/src/services/lakefs/backend.rs
+++ b/core/src/services/lakefs/backend.rs
@@ -210,20 +210,13 @@ impl Access for LakefsBackend {
match status {
StatusCode::OK => {
- let mut meta = parse_into_metadata(path, resp.headers())?;
- let bs = resp.clone().into_body();
+ let bs = resp.into_body();
let decoded_response: LakefsStatus =
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
- if let Some(size_bytes) = decoded_response.size_bytes {
- meta.set_content_length(size_bytes);
- }
- meta.set_mode(EntryMode::FILE);
- if let Some(v) = parse_content_disposition(resp.headers())? {
- meta.set_content_disposition(v);
- }
-
-
meta.set_last_modified(Timestamp::from_second(decoded_response.mtime).unwrap());
+
+ // Use the helper function to parse LakefsStatus into Metadata
+ let meta =
LakefsCore::parse_lakefs_status_into_metadata(&decoded_response);
Ok(RpStat::new(meta))
}
diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs
index 95008ad92..86051acd4 100644
--- a/core/src/services/lakefs/core.rs
+++ b/core/src/services/lakefs/core.rs
@@ -233,6 +233,41 @@ impl LakefsCore {
.map_err(new_request_build_error)?;
self.info.http_client().send(req).await
}
+
+ /// Parse LakefsStatus into Metadata
+ pub fn parse_lakefs_status_into_metadata(status: &LakefsStatus) ->
Metadata {
+ // Determine entry mode based on path_type
+ // "common_prefix" indicates a directory in list operations
+ let mode = if status.path_type == "common_prefix" {
+ EntryMode::DIR
+ } else {
+ EntryMode::FILE
+ };
+
+ let mut meta = Metadata::new(mode);
+
+ // Set checksum as etag
+ if !status.checksum.is_empty() {
+ meta.set_etag(&status.checksum);
+ }
+
+ // Set content length
+ if let Some(size) = status.size_bytes {
+ meta.set_content_length(size);
+ }
+
+ // Set content type
+ if let Some(ref content_type) = status.content_type {
+ meta.set_content_type(content_type);
+ }
+
+ // Set last modified time
+ if let Ok(timestamp) = Timestamp::from_second(status.mtime) {
+ meta.set_last_modified(timestamp);
+ }
+
+ meta
+ }
}
#[derive(Deserialize, Eq, PartialEq, Debug)]
diff --git a/core/src/services/lakefs/writer.rs
b/core/src/services/lakefs/writer.rs
index c45805756..8699cc07e 100644
--- a/core/src/services/lakefs/writer.rs
+++ b/core/src/services/lakefs/writer.rs
@@ -17,9 +17,11 @@
use std::sync::Arc;
+use bytes::Buf;
use http::StatusCode;
use super::core::LakefsCore;
+use super::core::LakefsStatus;
use super::error::parse_error;
use crate::raw::*;
use crate::*;
@@ -43,7 +45,37 @@ impl oio::OneShotWrite for LakefsWriter {
let status = resp.status();
match status {
- StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()),
+ StatusCode::CREATED | StatusCode::OK => {
+ let body = resp.into_body();
+ let body_bytes = body.to_bytes();
+
+ // Try to parse metadata from upload response body
+ match serde_json::from_slice::<LakefsStatus>(&body_bytes) {
+ Ok(lakefs_status) => {
+ // Successfully parsed ObjectStats from upload response
+ Ok(LakefsCore::parse_lakefs_status_into_metadata(
+ &lakefs_status,
+ ))
+ }
+ Err(_) => {
+ // Upload response doesn't contain ObjectStats, fetch
via stat API
+ let stat_resp =
self.core.get_object_metadata(&self.path).await?;
+
+ match stat_resp.status() {
+ StatusCode::OK => {
+ let lakefs_status: LakefsStatus =
+
serde_json::from_reader(stat_resp.into_body().reader())
+ .map_err(new_json_deserialize_error)?;
+
+
Ok(LakefsCore::parse_lakefs_status_into_metadata(
+ &lakefs_status,
+ ))
+ }
+ _ => Err(parse_error(stat_resp)),
+ }
+ }
+ }
+ }
_ => Err(parse_error(resp)),
}
}