This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f6738490 feat(services/lakefs): Implement write returns metadata 
(#6770)
0f6738490 is described below

commit 0f6738490dc1d21d0e3db4bd6f48621e676a9f86
Author: Kingsword <[email protected]>
AuthorDate: Mon Nov 10 16:16:45 2025 +0800

    feat(services/lakefs): Implement write returns metadata (#6770)
---
 core/src/services/lakefs/backend.rs | 15 ++++-----------
 core/src/services/lakefs/core.rs    | 35 +++++++++++++++++++++++++++++++++++
 core/src/services/lakefs/writer.rs  | 34 +++++++++++++++++++++++++++++++++-
 3 files changed, 72 insertions(+), 12 deletions(-)

diff --git a/core/src/services/lakefs/backend.rs 
b/core/src/services/lakefs/backend.rs
index f59cd3dec..c4ffc6b6e 100644
--- a/core/src/services/lakefs/backend.rs
+++ b/core/src/services/lakefs/backend.rs
@@ -210,20 +210,13 @@ impl Access for LakefsBackend {
 
         match status {
             StatusCode::OK => {
-                let mut meta = parse_into_metadata(path, resp.headers())?;
-                let bs = resp.clone().into_body();
+                let bs = resp.into_body();
 
                 let decoded_response: LakefsStatus =
                     
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
-                if let Some(size_bytes) = decoded_response.size_bytes {
-                    meta.set_content_length(size_bytes);
-                }
-                meta.set_mode(EntryMode::FILE);
-                if let Some(v) = parse_content_disposition(resp.headers())? {
-                    meta.set_content_disposition(v);
-                }
-
-                
meta.set_last_modified(Timestamp::from_second(decoded_response.mtime).unwrap());
+
+                // Use the helper function to parse LakefsStatus into Metadata
+                let meta = 
LakefsCore::parse_lakefs_status_into_metadata(&decoded_response);
 
                 Ok(RpStat::new(meta))
             }
diff --git a/core/src/services/lakefs/core.rs b/core/src/services/lakefs/core.rs
index 95008ad92..86051acd4 100644
--- a/core/src/services/lakefs/core.rs
+++ b/core/src/services/lakefs/core.rs
@@ -233,6 +233,41 @@ impl LakefsCore {
             .map_err(new_request_build_error)?;
         self.info.http_client().send(req).await
     }
+
+    /// Parse LakefsStatus into Metadata
+    pub fn parse_lakefs_status_into_metadata(status: &LakefsStatus) -> 
Metadata {
+        // Determine entry mode based on path_type
+        // "common_prefix" indicates a directory in list operations
+        let mode = if status.path_type == "common_prefix" {
+            EntryMode::DIR
+        } else {
+            EntryMode::FILE
+        };
+
+        let mut meta = Metadata::new(mode);
+
+        // Set checksum as etag
+        if !status.checksum.is_empty() {
+            meta.set_etag(&status.checksum);
+        }
+
+        // Set content length
+        if let Some(size) = status.size_bytes {
+            meta.set_content_length(size);
+        }
+
+        // Set content type
+        if let Some(ref content_type) = status.content_type {
+            meta.set_content_type(content_type);
+        }
+
+        // Set last modified time
+        if let Ok(timestamp) = Timestamp::from_second(status.mtime) {
+            meta.set_last_modified(timestamp);
+        }
+
+        meta
+    }
 }
 
 #[derive(Deserialize, Eq, PartialEq, Debug)]
diff --git a/core/src/services/lakefs/writer.rs 
b/core/src/services/lakefs/writer.rs
index c45805756..8699cc07e 100644
--- a/core/src/services/lakefs/writer.rs
+++ b/core/src/services/lakefs/writer.rs
@@ -17,9 +17,11 @@
 
 use std::sync::Arc;
 
+use bytes::Buf;
 use http::StatusCode;
 
 use super::core::LakefsCore;
+use super::core::LakefsStatus;
 use super::error::parse_error;
 use crate::raw::*;
 use crate::*;
@@ -43,7 +45,37 @@ impl oio::OneShotWrite for LakefsWriter {
         let status = resp.status();
 
         match status {
-            StatusCode::CREATED | StatusCode::OK => Ok(Metadata::default()),
+            StatusCode::CREATED | StatusCode::OK => {
+                let body = resp.into_body();
+                let body_bytes = body.to_bytes();
+
+                // Try to parse metadata from upload response body
+                match serde_json::from_slice::<LakefsStatus>(&body_bytes) {
+                    Ok(lakefs_status) => {
+                        // Successfully parsed ObjectStats from upload response
+                        Ok(LakefsCore::parse_lakefs_status_into_metadata(
+                            &lakefs_status,
+                        ))
+                    }
+                    Err(_) => {
+                        // Upload response doesn't contain ObjectStats, fetch 
via stat API
+                        let stat_resp = 
self.core.get_object_metadata(&self.path).await?;
+
+                        match stat_resp.status() {
+                            StatusCode::OK => {
+                                let lakefs_status: LakefsStatus =
+                                    
serde_json::from_reader(stat_resp.into_body().reader())
+                                        .map_err(new_json_deserialize_error)?;
+
+                                
Ok(LakefsCore::parse_lakefs_status_into_metadata(
+                                    &lakefs_status,
+                                ))
+                            }
+                            _ => Err(parse_error(stat_resp)),
+                        }
+                    }
+                }
+            }
             _ => Err(parse_error(resp)),
         }
     }

Reply via email to