This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 24cebe9  feat: support s3 as remote segment (#93)
24cebe9 is described below

commit 24cebe9b07f3df34bf65bf670741570cbad21830
Author: AlexZhao <[email protected]>
AuthorDate: Sat Dec 20 11:32:10 2025 +0800

    feat: support s3 as remote segment (#93)
    
    ---------
    Co-authored-by: luoyuxia <[email protected]>
---
 Cargo.toml                                         |   5 +
 crates/examples/Cargo.toml                         |   2 +-
 crates/fluss/Cargo.toml                            |  11 +-
 crates/fluss/src/client/credentials.rs             | 165 +++++++++++++++++++++
 crates/fluss/src/client/mod.rs                     |   2 +
 crates/fluss/src/client/table/remote_log.rs        |  69 +++++++--
 crates/fluss/src/client/table/scanner.rs           |   9 ++
 crates/fluss/src/io/mod.rs                         |   7 +-
 crates/fluss/src/io/storage.rs                     |  16 +-
 crates/fluss/src/io/storage_s3.rs                  |  48 ++++++
 crates/fluss/src/proto/fluss_api.proto             |  15 ++
 crates/fluss/src/record/arrow.rs                   |  67 ++++++++-
 crates/fluss/src/rpc/api_key.rs                    |   3 +
 crates/fluss/src/rpc/message/get_security_token.rs |  53 +++++++
 crates/fluss/src/rpc/message/mod.rs                |   2 +
 15 files changed, 453 insertions(+), 21 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 284a836..4155ea8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,4 +33,9 @@ fluss = { version = "0.1.0", path = "./crates/fluss" }
 tokio = { version = "1.44.2", features = ["full"] }
 clap = { version = "4.5.37", features = ["derive"] }
 arrow = { version = "57.0.0", features = ["ipc_compression"] }
+chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
+
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+opendal = "0.53"
 jiff = { version = "0.2" }
diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml
index dab85b6..e1fa531 100644
--- a/crates/examples/Cargo.toml
+++ b/crates/examples/Cargo.toml
@@ -26,7 +26,7 @@ version = { workspace = true }
 [dependencies]
 fluss = { workspace = true }
 tokio = { workspace = true }
-clap = { workspace = true}
+clap = { workspace = true }
 [[example]]
 name = "example-table"
 path = "src/example_table.rs"
\ No newline at end of file
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index aa763d5..0cf0364 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -22,11 +22,12 @@ version = { workspace = true }
 name = "fluss"
 
 [features]
-default = ["storage-memory", "storage-fs"]
-storage-all = ["storage-memory", "storage-fs"]
+default = ["storage-memory", "storage-fs", "storage-s3"]
+storage-all = ["storage-memory", "storage-fs", "storage-s3"]
 
 storage-memory = ["opendal/services-memory"]
 storage-fs = ["opendal/services-fs"]
+storage-s3 = ["opendal/services-s3"]
 integration_tests = []
 
 [dependencies]
@@ -39,9 +40,9 @@ crc32c = "0.6.8"
 linked-hash-map = "0.5.6"
 prost = "0.14"
 rand = "0.9.1"
-serde = { version = "1.0.219", features = ["derive", "rc"] }
-serde_json = "1.0.140"
-thiserror = "2"
+serde = { workspace = true, features = ["rc"] }
+serde_json = { workspace = true }
+thiserror = "1.0"
 log = { version = "0.4", features = ["kv_std"] }
 tokio = { workspace = true }
 parking_lot = "0.12"
diff --git a/crates/fluss/src/client/credentials.rs 
b/crates/fluss/src/client/credentials.rs
new file mode 100644
index 0000000..bd2a477
--- /dev/null
+++ b/crates/fluss/src/client/credentials.rs
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::metadata::Metadata;
+use crate::error::{Error, Result};
+use crate::rpc::RpcClient;
+use crate::rpc::message::GetSecurityTokenRequest;
+use parking_lot::RwLock;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+const CACHE_TTL: Duration = Duration::from_secs(3600);
+
+#[derive(Debug, Deserialize)]
+struct Credentials {
+    access_key_id: String,
+    access_key_secret: String,
+    security_token: Option<String>,
+}
+
+struct CachedToken {
+    access_key_id: String,
+    secret_access_key: String,
+    security_token: Option<String>,
+    addition_infos: HashMap<String, String>,
+    cached_at: Instant,
+}
+
+impl CachedToken {
+    fn to_remote_fs_props(&self) -> HashMap<String, String> {
+        let mut props = HashMap::new();
+
+        props.insert("access_key_id".to_string(), self.access_key_id.clone());
+        props.insert(
+            "secret_access_key".to_string(),
+            self.secret_access_key.clone(),
+        );
+
+        if let Some(token) = &self.security_token {
+            props.insert("security_token".to_string(), token.clone());
+        }
+
+        for (key, value) in &self.addition_infos {
+            if let Some((opendal_key, transform)) = 
convert_hadoop_key_to_opendal(key) {
+                let final_value = if transform {
+                    // Invert boolean value (path_style_access -> 
enable_virtual_host_style)
+                    if value == "true" {
+                        "false".to_string()
+                    } else {
+                        "true".to_string()
+                    }
+                } else {
+                    value.clone()
+                };
+                props.insert(opendal_key, final_value);
+            }
+        }
+
+        props
+    }
+}
+
+/// Returns (opendal_key, needs_inversion)
+/// needs_inversion is true for path_style_access -> enable_virtual_host_style 
conversion
+fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> {
+    match hadoop_key {
+        "fs.s3a.endpoint" => Some(("endpoint".to_string(), false)),
+        "fs.s3a.endpoint.region" => Some(("region".to_string(), false)),
+        "fs.s3a.path.style.access" => 
Some(("enable_virtual_host_style".to_string(), true)),
+        "fs.s3a.connection.ssl.enabled" => None,
+        _ => None,
+    }
+}
+
+pub struct CredentialsCache {
+    inner: RwLock<Option<CachedToken>>,
+}
+
+impl CredentialsCache {
+    pub fn new() -> Self {
+        Self {
+            inner: RwLock::new(None),
+        }
+    }
+
+    pub async fn get_or_refresh(
+        &self,
+        rpc_client: &Arc<RpcClient>,
+        metadata: &Arc<Metadata>,
+    ) -> Result<HashMap<String, String>> {
+        {
+            let guard = self.inner.read();
+            if let Some(cached) = guard.as_ref() {
+                if cached.cached_at.elapsed() < CACHE_TTL {
+                    return Ok(cached.to_remote_fs_props());
+                }
+            }
+        }
+
+        self.refresh_from_server(rpc_client, metadata).await
+    }
+
+    async fn refresh_from_server(
+        &self,
+        rpc_client: &Arc<RpcClient>,
+        metadata: &Arc<Metadata>,
+    ) -> Result<HashMap<String, String>> {
+        let cluster = metadata.get_cluster();
+        let server_node = cluster.get_one_available_server();
+        let conn = rpc_client.get_connection(server_node).await?;
+
+        let request = GetSecurityTokenRequest::new();
+        let response = conn.request(request).await?;
+
+        // the token may be empty if the remote filesystem
+        // doesn't require token to access
+        if response.token.is_empty() {
+            return Ok(HashMap::new());
+        }
+
+        let credentials: Credentials = 
serde_json::from_slice(&response.token).map_err(|e| {
+            Error::JsonSerdeError(format!("Error when parse token from server: 
{e}"))
+        })?;
+
+        let mut addition_infos = HashMap::new();
+        for kv in &response.addition_info {
+            addition_infos.insert(kv.key.clone(), kv.value.clone());
+        }
+
+        let cached = CachedToken {
+            access_key_id: credentials.access_key_id,
+            secret_access_key: credentials.access_key_secret,
+            security_token: credentials.security_token,
+            addition_infos,
+            cached_at: Instant::now(),
+        };
+
+        let props = cached.to_remote_fs_props();
+        *self.inner.write() = Some(cached);
+
+        Ok(props)
+    }
+}
+
+impl Default for CredentialsCache {
+    fn default() -> Self {
+        Self::new()
+    }
+}
diff --git a/crates/fluss/src/client/mod.rs b/crates/fluss/src/client/mod.rs
index a971439..cff218b 100644
--- a/crates/fluss/src/client/mod.rs
+++ b/crates/fluss/src/client/mod.rs
@@ -17,12 +17,14 @@
 
 mod admin;
 mod connection;
+mod credentials;
 mod metadata;
 mod table;
 mod write;
 
 pub use admin::*;
 pub use connection::*;
+pub use credentials::*;
 pub use metadata::*;
 pub use table::*;
 pub use write::*;
diff --git a/crates/fluss/src/client/table/remote_log.rs 
b/crates/fluss/src/client/table/remote_log.rs
index 65805d0..a2561f3 100644
--- a/crates/fluss/src/client/table/remote_log.rs
+++ b/crates/fluss/src/client/table/remote_log.rs
@@ -20,6 +20,7 @@ use crate::metadata::TableBucket;
 use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment};
 use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord};
 use crate::util::delete_file;
+use parking_lot::RwLock;
 use std::collections::HashMap;
 use std::io;
 use std::path::{Path, PathBuf};
@@ -115,11 +116,19 @@ impl RemoteLogDownloadFuture {
 /// Downloader for remote log segment files
 pub struct RemoteLogDownloader {
     local_log_dir: TempDir,
+    remote_fs_props: RwLock<HashMap<String, String>>,
 }
 
 impl RemoteLogDownloader {
     pub fn new(local_log_dir: TempDir) -> Result<Self> {
-        Ok(Self { local_log_dir })
+        Ok(Self {
+            local_log_dir,
+            remote_fs_props: RwLock::new(HashMap::new()),
+        })
+    }
+
+    pub fn set_remote_fs_props(&self, props: HashMap<String, String>) {
+        *self.remote_fs_props.write() = props;
     }
 
     /// Request to fetch a remote log segment to local. This method is 
non-blocking.
@@ -133,10 +142,16 @@ impl RemoteLogDownloader {
         let local_file_path = self.local_log_dir.path().join(&local_file_name);
         let remote_path = self.build_remote_path(remote_log_tablet_dir, 
segment);
         let remote_log_tablet_dir = remote_log_tablet_dir.to_string();
+        let remote_fs_props = self.remote_fs_props.read().clone();
         // Spawn async download task
         tokio::spawn(async move {
-            let result =
-                Self::download_file(&remote_log_tablet_dir, &remote_path, 
&local_file_path).await;
+            let result = Self::download_file(
+                &remote_log_tablet_dir,
+                &remote_path,
+                &local_file_path,
+                &remote_fs_props,
+            )
+            .await;
             let _ = sender.send(result);
         });
         Ok(RemoteLogDownloadFuture::new(receiver))
@@ -157,6 +172,7 @@ impl RemoteLogDownloader {
         remote_log_tablet_dir: &str,
         remote_path: &str,
         local_path: &Path,
+        remote_fs_props: &HashMap<String, String>,
     ) -> Result<PathBuf> {
         // Handle both URL (e.g., "s3://bucket/path") and local file paths
         // If the path doesn't contain "://", treat it as a local file path
@@ -169,11 +185,27 @@ impl RemoteLogDownloader {
         // Create FileIO from the remote log tablet dir URL to get the storage
         let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?;
 
+        // For S3/S3A URLs, inject S3 credentials from props
+        let file_io_builder = if remote_log_tablet_dir.starts_with("s3://")
+            || remote_log_tablet_dir.starts_with("s3a://")
+        {
+            file_io_builder.with_props(
+                remote_fs_props
+                    .iter()
+                    .map(|(k, v)| (k.as_str(), v.as_str())),
+            )
+        } else {
+            file_io_builder
+        };
+
         // Build storage and create operator directly
         let storage = Storage::build(file_io_builder)?;
         let (op, relative_path) = storage.create(remote_path)?;
 
-        // Get file metadata to know the size
+        // Timeout for remote storage operations (30 seconds)
+        const REMOTE_OP_TIMEOUT: std::time::Duration = 
std::time::Duration::from_secs(30);
+
+        // Get file metadata to know the size with timeout
         let meta = op.stat(relative_path).await?;
         let file_size = meta.content_length();
 
@@ -184,13 +216,32 @@ impl RemoteLogDownloader {
         // opendal::Reader::read accepts a range, so we read in chunks
         const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient 
reading
         let mut offset = 0u64;
+        let mut chunk_count = 0u64;
+        let total_chunks = file_size.div_ceil(CHUNK_SIZE);
 
         while offset < file_size {
             let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
             let range = offset..end;
-
-            // Read chunk from remote storage
-            let chunk = 
op.read_with(relative_path).range(range.clone()).await?;
+            chunk_count += 1;
+
+            if chunk_count <= 3 || chunk_count % 10 == 0 {
+                log::debug!(
+                    "Remote log download: reading chunk 
{chunk_count}/{total_chunks} (offset {offset})"
+                );
+            }
+
+            // Read chunk from remote storage with timeout
+            let read_future = op.read_with(relative_path).range(range.clone());
+            let chunk = tokio::time::timeout(REMOTE_OP_TIMEOUT, read_future)
+                .await
+                .map_err(|_| {
+                    Error::Io(io::Error::new(
+                        io::ErrorKind::TimedOut,
+                        format!(
+                            "Timeout reading chunk from remote storage: 
{remote_path} at offset {offset}"
+                        ),
+                    ))
+                })??;
             let bytes = chunk.to_bytes();
 
             // Write chunk to local file
@@ -254,10 +305,10 @@ impl RemotePendingFetch {
         // delete the downloaded local file to free disk
         delete_file(file_path).await;
 
-        // Parse log records
+        // Parse log records (remote log contains full data, need client-side 
projection)
         let mut fetch_records = vec![];
         for log_record in &mut LogRecordsBatchs::new(data) {
-            fetch_records.extend(log_record.records(&self.read_context)?);
+            
fetch_records.extend(log_record.records_for_remote_log(&self.read_context)?);
         }
 
         let mut result = HashMap::new();
diff --git a/crates/fluss/src/client/table/scanner.rs 
b/crates/fluss/src/client/table/scanner.rs
index f6780d7..f66d7d7 100644
--- a/crates/fluss/src/client/table/scanner.rs
+++ b/crates/fluss/src/client/table/scanner.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 use crate::client::connection::FlussConnection;
+use crate::client::credentials::CredentialsCache;
 use crate::client::metadata::Metadata;
 use crate::error::{Error, Result};
 use crate::metadata::{TableBucket, TableInfo, TablePath};
@@ -194,6 +195,7 @@ struct LogFetcher {
     log_scanner_status: Arc<LogScannerStatus>,
     read_context: ReadContext,
     remote_log_downloader: RemoteLogDownloader,
+    credentials_cache: CredentialsCache,
 }
 
 impl LogFetcher {
@@ -217,6 +219,7 @@ impl LogFetcher {
             log_scanner_status,
             read_context,
             remote_log_downloader: RemoteLogDownloader::new(tmp_dir)?,
+            credentials_cache: CredentialsCache::new(),
         })
     }
 
@@ -256,6 +259,12 @@ impl LogFetcher {
                     if let Some(ref remote_log_fetch_info) =
                         fetch_log_for_bucket.remote_log_fetch_info
                     {
+                        let remote_fs_props = self
+                            .credentials_cache
+                            .get_or_refresh(&self.conns, &self.metadata)
+                            .await?;
+                        self.remote_log_downloader
+                            .set_remote_fs_props(remote_fs_props);
                         let remote_fetch_info = RemoteLogFetchInfo::from_proto(
                             remote_log_fetch_info,
                             table_bucket.clone(),
diff --git a/crates/fluss/src/io/mod.rs b/crates/fluss/src/io/mod.rs
index 3c9a165..a03a394 100644
--- a/crates/fluss/src/io/mod.rs
+++ b/crates/fluss/src/io/mod.rs
@@ -27,8 +27,13 @@ pub use storage::*;
 mod storage_fs;
 #[cfg(feature = "storage-fs")]
 use storage_fs::*;
+
 #[cfg(feature = "storage-memory")]
 mod storage_memory;
-
 #[cfg(feature = "storage-memory")]
 use storage_memory::*;
+
+#[cfg(feature = "storage-s3")]
+mod storage_s3;
+#[cfg(feature = "storage-s3")]
+use storage_s3::*;
diff --git a/crates/fluss/src/io/storage.rs b/crates/fluss/src/io/storage.rs
index 361da7e..089670e 100644
--- a/crates/fluss/src/io/storage.rs
+++ b/crates/fluss/src/io/storage.rs
@@ -19,6 +19,7 @@ use crate::error;
 use crate::error::Result;
 use crate::io::FileIOBuilder;
 use opendal::{Operator, Scheme};
+use std::collections::HashMap;
 
 /// The storage carries all supported storage services in fluss
 #[derive(Debug)]
@@ -27,11 +28,13 @@ pub enum Storage {
     Memory,
     #[cfg(feature = "storage-fs")]
     LocalFs,
+    #[cfg(feature = "storage-s3")]
+    S3 { props: HashMap<String, String> },
 }
 
 impl Storage {
     pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
-        let (scheme_str, _) = file_io_builder.into_parts();
+        let (scheme_str, props) = file_io_builder.into_parts();
         let scheme = Self::parse_scheme(&scheme_str)?;
 
         match scheme {
@@ -39,6 +42,8 @@ impl Storage {
             Scheme::Memory => Ok(Self::Memory),
             #[cfg(feature = "storage-fs")]
             Scheme::Fs => Ok(Self::LocalFs),
+            #[cfg(feature = "storage-s3")]
+            Scheme::S3 => Ok(Self::S3 { props }),
             _ => Err(error::Error::IoUnsupported(
                 "Unsupported storage feature".to_string(),
             )),
@@ -66,6 +71,14 @@ impl Storage {
                     Ok((op, &path[1..]))
                 }
             }
+            #[cfg(feature = "storage-s3")]
+            Storage::S3 { props } => {
+                let (bucket, key) = super::parse_s3_path(path);
+                let mut s3_props = props.clone();
+                s3_props.insert("bucket".to_string(), bucket.to_string());
+                let op = super::s3_config_build(&s3_props)?;
+                Ok((op, key))
+            }
         }
     }
 
@@ -73,6 +86,7 @@ impl Storage {
         match scheme {
             "memory" => Ok(Scheme::Memory),
             "file" | "" => Ok(Scheme::Fs),
+            "s3" | "s3a" => Ok(Scheme::S3),
             s => Ok(s.parse::<Scheme>()?),
         }
     }
diff --git a/crates/fluss/src/io/storage_s3.rs 
b/crates/fluss/src/io/storage_s3.rs
new file mode 100644
index 0000000..8000d09
--- /dev/null
+++ b/crates/fluss/src/io/storage_s3.rs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Result;
+use opendal::Configurator;
+use opendal::Operator;
+use opendal::layers::TimeoutLayer;
+use opendal::services::S3Config;
+use std::collections::HashMap;
+use std::time::Duration;
+
+pub(crate) fn s3_config_build(props: &HashMap<String, String>) -> 
Result<Operator> {
+    let config = S3Config::from_iter(props.clone())?;
+    let op = Operator::from_config(config)?.finish();
+
+    // Add timeout layer to prevent hanging on S3 operations
+    let timeout_layer = TimeoutLayer::new()
+        .with_timeout(Duration::from_secs(10))
+        .with_io_timeout(Duration::from_secs(30));
+
+    Ok(op.layer(timeout_layer))
+}
+
+pub(crate) fn parse_s3_path(path: &str) -> (&str, &str) {
+    let path = path
+        .strip_prefix("s3a://")
+        .or_else(|| path.strip_prefix("s3://"))
+        .unwrap_or(path);
+
+    match path.find('/') {
+        Some(idx) => (&path[..idx], &path[idx + 1..]),
+        None => (path, ""),
+    }
+}
diff --git a/crates/fluss/src/proto/fluss_api.proto 
b/crates/fluss/src/proto/fluss_api.proto
index ef460fc..e59c2d9 100644
--- a/crates/fluss/src/proto/fluss_api.proto
+++ b/crates/fluss/src/proto/fluss_api.proto
@@ -297,4 +297,19 @@ message PbLakeSnapshotForBucket {
   optional int64 partition_id = 1;
   required int32 bucket_id = 2;
   optional int64 log_offset = 3;
+}
+
+message PbKeyValue {
+  required string key = 1;
+  required string value = 2;
+}
+
+message GetFileSystemSecurityTokenRequest {
+}
+
+message GetFileSystemSecurityTokenResponse {
+  required string schema = 1;
+  required bytes token = 2;
+  optional int64 expiration_time = 3;
+  repeated PbKeyValue addition_info = 4;
 }
\ No newline at end of file
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 806c9a5..f079f09 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -504,6 +504,30 @@ impl<'a> LogRecordBatch<'a> {
         };
         Ok(log_record_iterator)
     }
+
+    pub fn records_for_remote_log(&self, read_context: &ReadContext) -> 
Result<LogRecordIterator> {
+        if self.record_count() == 0 {
+            return Ok(LogRecordIterator::empty());
+        }
+
+        let data = &self.data[RECORDS_OFFSET..];
+
+        let record_batch = read_context.record_batch_for_remote_log(data)?;
+        let log_record_iterator = match record_batch {
+            None => LogRecordIterator::empty(),
+            Some(record_batch) => {
+                let arrow_reader = ArrowReader::new(Arc::new(record_batch));
+                LogRecordIterator::Arrow(ArrowLogRecordIterator {
+                    reader: arrow_reader,
+                    base_offset: self.base_log_offset(),
+                    timestamp: self.commit_timestamp(),
+                    row_id: 0,
+                    change_type: ChangeType::AppendOnly,
+                })
+            }
+        };
+        Ok(log_record_iterator)
+    }
 }
 
 /// Parse an Arrow IPC message from a byte slice.
@@ -552,7 +576,8 @@ fn parse_ipc_message(
     let message = root_as_message(metadata_bytes).ok()?;
     let batch_metadata = message.header_as_record_batch()?;
 
-    let body_start = 8 + metadata_size;
+    let metadata_padded_size = (metadata_size + 7) & !7;
+    let body_start = 8 + metadata_padded_size;
     let body_data = &data[body_start..];
     let body_buffer = Buffer::from(body_data);
 
@@ -677,7 +702,7 @@ pub fn to_arrow_type(fluss_type: &DataType) -> 
ArrowDataType {
 #[derive(Clone)]
 pub struct ReadContext {
     target_schema: SchemaRef,
-
+    full_schema: SchemaRef,
     projection: Option<Projection>,
 }
 
@@ -694,7 +719,8 @@ struct Projection {
 impl ReadContext {
     pub fn new(arrow_schema: SchemaRef) -> ReadContext {
         ReadContext {
-            target_schema: arrow_schema,
+            target_schema: arrow_schema.clone(),
+            full_schema: arrow_schema,
             projection: None,
         }
     }
@@ -730,7 +756,10 @@ impl ReadContext {
                 }
             } else {
                 Projection {
-                    ordered_schema: Self::project_schema(arrow_schema, 
projected_fields.as_slice()),
+                    ordered_schema: Self::project_schema(
+                        arrow_schema.clone(),
+                        projected_fields.as_slice(),
+                    ),
                     ordered_fields: projected_fields.clone(),
                     projected_fields,
                     reordering_indexes: vec![],
@@ -741,6 +770,7 @@ impl ReadContext {
 
         ReadContext {
             target_schema,
+            full_schema: arrow_schema,
             projection: Some(project),
         }
     }
@@ -809,6 +839,35 @@ impl ReadContext {
         };
         Ok(Some(record_batch))
     }
+
+    pub fn record_batch_for_remote_log(&self, data: &[u8]) -> 
Result<Option<RecordBatch>> {
+        let (batch_metadata, body_buffer, version) = match 
parse_ipc_message(data) {
+            Some(result) => result,
+            None => return Ok(None),
+        };
+
+        let record_batch = read_record_batch(
+            &body_buffer,
+            batch_metadata,
+            self.full_schema.clone(),
+            &std::collections::HashMap::new(),
+            None,
+            &version,
+        )?;
+
+        let record_batch = match &self.projection {
+            Some(projection) => {
+                let projected_columns: Vec<_> = projection
+                    .projected_fields
+                    .iter()
+                    .map(|&idx| record_batch.column(idx).clone())
+                    .collect();
+                RecordBatch::try_new(self.target_schema.clone(), 
projected_columns)?
+            }
+            None => record_batch,
+        };
+        Ok(Some(record_batch))
+    }
 }
 
 pub enum LogRecordIterator {
diff --git a/crates/fluss/src/rpc/api_key.rs b/crates/fluss/src/rpc/api_key.rs
index 215bb39..b11647f 100644
--- a/crates/fluss/src/rpc/api_key.rs
+++ b/crates/fluss/src/rpc/api_key.rs
@@ -32,6 +32,7 @@ pub enum ApiKey {
     ProduceLog,
     FetchLog,
     ListOffsets,
+    GetFileSystemSecurityToken,
     GetDatabaseInfo,
     GetLatestLakeSnapshot,
     Unknown(i16),
@@ -53,6 +54,7 @@ impl From<i16> for ApiKey {
             1014 => ApiKey::ProduceLog,
             1015 => ApiKey::FetchLog,
             1021 => ApiKey::ListOffsets,
+            1025 => ApiKey::GetFileSystemSecurityToken,
             1032 => ApiKey::GetLatestLakeSnapshot,
             1035 => ApiKey::GetDatabaseInfo,
             _ => Unknown(key),
@@ -76,6 +78,7 @@ impl From<ApiKey> for i16 {
             ApiKey::ProduceLog => 1014,
             ApiKey::FetchLog => 1015,
             ApiKey::ListOffsets => 1021,
+            ApiKey::GetFileSystemSecurityToken => 1025,
             ApiKey::GetLatestLakeSnapshot => 1032,
             ApiKey::GetDatabaseInfo => 1035,
             Unknown(x) => x,
diff --git a/crates/fluss/src/rpc/message/get_security_token.rs 
b/crates/fluss/src/rpc/message/get_security_token.rs
new file mode 100644
index 0000000..7995232
--- /dev/null
+++ b/crates/fluss/src/rpc/message/get_security_token.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::proto::{GetFileSystemSecurityTokenRequest, 
GetFileSystemSecurityTokenResponse};
+use crate::rpc::api_key::ApiKey;
+use crate::rpc::api_version::ApiVersion;
+use crate::rpc::frame::{ReadError, WriteError};
+use crate::rpc::message::{ReadVersionedType, RequestBody, WriteVersionedType};
+use crate::{impl_read_version_type, impl_write_version_type};
+use bytes::{Buf, BufMut};
+use prost::Message;
+
+#[derive(Debug)]
+pub struct GetSecurityTokenRequest {
+    pub inner_request: GetFileSystemSecurityTokenRequest,
+}
+
+impl GetSecurityTokenRequest {
+    pub fn new() -> Self {
+        Self {
+            inner_request: GetFileSystemSecurityTokenRequest {},
+        }
+    }
+}
+
+impl Default for GetSecurityTokenRequest {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl RequestBody for GetSecurityTokenRequest {
+    type ResponseBody = GetFileSystemSecurityTokenResponse;
+    const API_KEY: ApiKey = ApiKey::GetFileSystemSecurityToken;
+    const REQUEST_VERSION: ApiVersion = ApiVersion(0);
+}
+
+impl_write_version_type!(GetSecurityTokenRequest);
+impl_read_version_type!(GetFileSystemSecurityTokenResponse);
diff --git a/crates/fluss/src/rpc/message/mod.rs 
b/crates/fluss/src/rpc/message/mod.rs
index 230d971..0ed5b7c 100644
--- a/crates/fluss/src/rpc/message/mod.rs
+++ b/crates/fluss/src/rpc/message/mod.rs
@@ -28,6 +28,7 @@ mod drop_table;
 mod fetch;
 mod get_database_info;
 mod get_latest_lake_snapshot;
+mod get_security_token;
 mod get_table;
 mod header;
 mod list_databases;
@@ -45,6 +46,7 @@ pub use drop_table::*;
 pub use fetch::*;
 pub use get_database_info::*;
 pub use get_latest_lake_snapshot::*;
+pub use get_security_token::*;
 pub use get_table::*;
 pub use header::*;
 pub use list_databases::*;

Reply via email to