Copilot commented on code in PR #93:
URL: https://github.com/apache/fluss-rust/pull/93#discussion_r2616653506


##########
Cargo.toml:
##########
@@ -36,3 +36,8 @@ tokio = { version = "1.44.2", features = ["full"] }
 clap = { version = "4.5.37", features = ["derive"] }
 arrow = { version = "57.0.0", features = ["ipc_compression"] }
 chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
+
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+opendal = { version = "0.53", features = ["services-s3"] }

Review Comment:
   The version specified in Cargo.toml (0.53) doesn't match the version used in 
the workspace (0.53.3 as seen in crates/fluss/Cargo.toml line 57). This 
inconsistency could lead to dependency resolution issues. Consider using 
workspace = true to inherit the version from the workspace root instead of 
specifying it directly.
   ```suggestion
   opendal = { version = "0.53.3", features = ["services-s3"] }
   ```



##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -184,13 +223,34 @@ impl RemoteLogDownloader {
         // opendal::Reader::read accepts a range, so we read in chunks
         const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient 
reading
         let mut offset = 0u64;
+        let mut chunk_count = 0u64;
+        let total_chunks = file_size.div_ceil(CHUNK_SIZE);
 
         while offset < file_size {
             let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
             let range = offset..end;
-
-            // Read chunk from remote storage
-            let chunk = 
op.read_with(relative_path).range(range.clone()).await?;
+            chunk_count += 1;
+
+            if chunk_count <= 3 || chunk_count % 10 == 0 {
+                eprintln!(
+                    "Remote log download: reading chunk {}/{} (offset {})",
+                    chunk_count, total_chunks, offset
+                );
+            }

Review Comment:
   Using `eprintln!` for logging download progress is not appropriate for 
production code. This outputs directly to stderr and cannot be controlled or 
filtered. Consider using the `tracing` crate (which is already a dependency of 
this project) for proper structured logging. This would allow users to control 
log levels and output destinations.



##########
crates/fluss/src/client/credentials.rs:
##########
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::metadata::Metadata;
+use crate::error::{Error, Result};
+use crate::rpc::RpcClient;
+use crate::rpc::message::GetSecurityTokenRequest;
+use parking_lot::RwLock;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+const CACHE_TTL: Duration = Duration::from_secs(3600);
+
+#[derive(Debug, Deserialize)]
+struct Credentials {
+    access_key_id: String,
+    access_key_secret: String,
+    security_token: Option<String>,
+}
+
+struct CachedToken {
+    access_key_id: String,
+    secret_access_key: String,
+    security_token: Option<String>,
+    addition_infos: HashMap<String, String>,
+    cached_at: Instant,
+}
+
+impl CachedToken {
+    fn to_s3_props(&self) -> HashMap<String, String> {
+        let mut props = HashMap::new();
+
+        props.insert("access_key_id".to_string(), self.access_key_id.clone());
+        props.insert(
+            "secret_access_key".to_string(),
+            self.secret_access_key.clone(),
+        );
+
+        if let Some(token) = &self.security_token {
+            props.insert("security_token".to_string(), token.clone());
+        }
+
+        for (key, value) in &self.addition_infos {
+            if let Some((opendal_key, transform)) = 
convert_hadoop_key_to_opendal(key) {
+                let final_value = if transform {
+                    // Invert boolean value (path_style_access -> 
enable_virtual_host_style)
+                    if value == "true" {
+                        "false".to_string()
+                    } else {
+                        "true".to_string()
+                    }
+                } else {
+                    value.clone()
+                };
+                props.insert(opendal_key, final_value);
+            }
+        }
+
+        props
+    }
+}
+
+/// Returns (opendal_key, needs_inversion)
+/// needs_inversion is true for path_style_access -> enable_virtual_host_style 
conversion
+fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> {
+    match hadoop_key {
+        "fs.s3a.endpoint" => Some(("endpoint".to_string(), false)),
+        "fs.s3a.endpoint.region" => Some(("region".to_string(), false)),
+        "fs.s3a.path.style.access" => 
Some(("enable_virtual_host_style".to_string(), true)),
+        "fs.s3a.connection.ssl.enabled" => None,
+        _ => None,
+    }
+}
+
+pub struct CredentialsCache {
+    inner: RwLock<Option<CachedToken>>,
+}
+
+impl CredentialsCache {
+    pub fn new() -> Self {
+        Self {
+            inner: RwLock::new(None),
+        }
+    }
+
+    pub async fn get_or_refresh(
+        &self,
+        rpc_client: &Arc<RpcClient>,
+        metadata: &Arc<Metadata>,
+    ) -> Result<HashMap<String, String>> {
+        {
+            let guard = self.inner.read();
+            if let Some(cached) = guard.as_ref() {
+                if cached.cached_at.elapsed() < CACHE_TTL {
+                    return Ok(cached.to_s3_props());
+                }
+            }
+        }
+
+        self.refresh_from_server(rpc_client, metadata).await
+    }
+
+    async fn refresh_from_server(
+        &self,
+        rpc_client: &Arc<RpcClient>,
+        metadata: &Arc<Metadata>,
+    ) -> Result<HashMap<String, String>> {
+        let cluster = metadata.get_cluster();
+        let server_node = cluster
+            .get_coordinator_server()
+            .or_else(|| Some(cluster.get_one_available_server()))
+            .expect("no available server to fetch security token");

Review Comment:
   Using `.expect()` with a panic message bypasses proper error handling. If no 
server is available, this will cause a panic instead of returning a proper 
error. Consider using `.ok_or_else()` to return an appropriate Error type 
instead, allowing the caller to handle this failure gracefully.



##########
crates/fluss/src/proto/fluss_api.proto:
##########
@@ -297,4 +297,19 @@ message PbLakeSnapshotForBucket {
   optional int64 partition_id = 1;
   required int32 bucket_id = 2;
   optional int64 log_offset = 3;
+}
+
+message PbKeyValue {
+  required string key = 1;
+  required string value = 2;
+}
+
+message GetFileSystemSecurityTokenRequest {
+}
+
+message GetFileSystemSecurityTokenResponse {
+  required string schema = 1;

Review Comment:
   The new `GetFileSystemSecurityTokenResponse` message returns a `token` 
(parsed client-side as JSON with `access_key_id`/`access_key_secret`) over the 
existing RPC layer, which currently uses a plain TCP `Transport::Plain` without 
TLS or any encryption. Anyone with access to the network path between client 
and server can sniff this RPC and steal cloud credentials, enabling full 
compromise of the backing object store and its data. Protect this RPC with 
transport-level encryption (for example by adding TLS to the RPC transport) 
and/or change the design so that only short-lived, scoped tokens are issued and 
transmitted instead of raw access keys.
   ```suggestion
   
   // Response containing a short-lived, scoped security token for accessing 
the file system.
   // WARNING: Do NOT transmit raw access keys or long-lived credentials in the 
token field.
   // Only issue and transmit short-lived, scoped tokens (e.g., session tokens, 
JWTs, or
   // cloud provider temporary credentials) with minimal privileges and a short 
expiration.
   message GetFileSystemSecurityTokenResponse {
     required string schema = 1;
     // Short-lived, scoped security token (e.g., session token, JWT, or 
temporary credentials).
     // MUST NOT contain raw access keys or long-lived secrets.
   ```



##########
crates/fluss/src/client/credentials.rs:
##########
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::metadata::Metadata;
+use crate::error::{Error, Result};
+use crate::rpc::RpcClient;
+use crate::rpc::message::GetSecurityTokenRequest;
+use parking_lot::RwLock;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+const CACHE_TTL: Duration = Duration::from_secs(3600);
+
+#[derive(Debug, Deserialize)]
+struct Credentials {
+    access_key_id: String,
+    access_key_secret: String,
+    security_token: Option<String>,
+}
+
+struct CachedToken {
+    access_key_id: String,
+    secret_access_key: String,
+    security_token: Option<String>,
+    addition_infos: HashMap<String, String>,
+    cached_at: Instant,
+}
+
+impl CachedToken {
+    fn to_s3_props(&self) -> HashMap<String, String> {
+        let mut props = HashMap::new();
+
+        props.insert("access_key_id".to_string(), self.access_key_id.clone());
+        props.insert(
+            "secret_access_key".to_string(),
+            self.secret_access_key.clone(),
+        );
+
+        if let Some(token) = &self.security_token {
+            props.insert("security_token".to_string(), token.clone());
+        }
+
+        for (key, value) in &self.addition_infos {
+            if let Some((opendal_key, transform)) = 
convert_hadoop_key_to_opendal(key) {
+                let final_value = if transform {
+                    // Invert boolean value (path_style_access -> 
enable_virtual_host_style)
+                    if value == "true" {
+                        "false".to_string()
+                    } else {
+                        "true".to_string()
+                    }
+                } else {
+                    value.clone()
+                };
+                props.insert(opendal_key, final_value);
+            }
+        }
+
+        props
+    }
+}
+
+/// Returns (opendal_key, needs_inversion)
+/// needs_inversion is true for path_style_access -> enable_virtual_host_style 
conversion
+fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> {
+    match hadoop_key {
+        "fs.s3a.endpoint" => Some(("endpoint".to_string(), false)),
+        "fs.s3a.endpoint.region" => Some(("region".to_string(), false)),
+        "fs.s3a.path.style.access" => 
Some(("enable_virtual_host_style".to_string(), true)),
+        "fs.s3a.connection.ssl.enabled" => None,
+        _ => None,
+    }
+}
+
+pub struct CredentialsCache {
+    inner: RwLock<Option<CachedToken>>,
+}
+
+impl CredentialsCache {
+    pub fn new() -> Self {
+        Self {
+            inner: RwLock::new(None),
+        }
+    }
+
+    pub async fn get_or_refresh(
+        &self,
+        rpc_client: &Arc<RpcClient>,
+        metadata: &Arc<Metadata>,
+    ) -> Result<HashMap<String, String>> {
+        {
+            let guard = self.inner.read();
+            if let Some(cached) = guard.as_ref() {
+                if cached.cached_at.elapsed() < CACHE_TTL {
+                    return Ok(cached.to_s3_props());
+                }
+            }
+        }
+
+        self.refresh_from_server(rpc_client, metadata).await
+    }
+
+    async fn refresh_from_server(
+        &self,
+        rpc_client: &Arc<RpcClient>,
+        metadata: &Arc<Metadata>,
+    ) -> Result<HashMap<String, String>> {
+        let cluster = metadata.get_cluster();
+        let server_node = cluster
+            .get_coordinator_server()
+            .or_else(|| Some(cluster.get_one_available_server()))
+            .expect("no available server to fetch security token");
+        let conn = rpc_client.get_connection(server_node).await?;
+
+        let request = GetSecurityTokenRequest::new();
+        let response = conn.request(request).await?;
+
+        let credentials: Credentials = serde_json::from_slice(&response.token)
+            .map_err(|e| Error::JsonSerdeError(e.to_string()))?;
+
+        let mut addition_infos = HashMap::new();
+        for kv in &response.addition_info {
+            addition_infos.insert(kv.key.clone(), kv.value.clone());
+        }
+
+        let cached = CachedToken {
+            access_key_id: credentials.access_key_id,
+            secret_access_key: credentials.access_key_secret,
+            security_token: credentials.security_token,
+            addition_infos,
+            cached_at: Instant::now(),
+        };
+
+        let props = cached.to_s3_props();
+        *self.inner.write() = Some(cached);
+
+        Ok(props)
+    }

Review Comment:
   The credentials cache has a fixed TTL of 1 hour (3600 seconds), but the 
response from the server includes an optional `expiration_time` field (see 
fluss_api.proto line 313). Consider using the server-provided expiration time 
when available instead of using a fixed duration, as the server may issue 
credentials with different lifetimes based on security policies.



##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -169,12 +185,35 @@ impl RemoteLogDownloader {
         // Create FileIO from the remote log tablet dir URL to get the storage
         let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?;
 
+        // For S3/S3A URLs, inject S3 credentials from props
+        let file_io_builder = if remote_log_tablet_dir.starts_with("s3://")
+            || remote_log_tablet_dir.starts_with("s3a://")
+        {
+            file_io_builder.with_props(s3_props.iter().map(|(k, v)| 
(k.as_str(), v.as_str())))
+        } else {
+            file_io_builder
+        };
+
         // Build storage and create operator directly
         let storage = Storage::build(file_io_builder)?;
         let (op, relative_path) = storage.create(remote_path)?;
 
-        // Get file metadata to know the size
-        let meta = op.stat(relative_path).await?;
+        // Timeout for remote storage operations (30 seconds)
+        const REMOTE_OP_TIMEOUT: std::time::Duration = 
std::time::Duration::from_secs(30);

Review Comment:
   The hardcoded timeout constant is defined locally in the function instead of 
at the module level or as a configurable constant. Consider moving this to a 
module-level constant for better maintainability and consistency with the 
pattern used in storage_s3.rs (lines 32-33) where timeouts are defined at the 
module level.
   ```suggestion
   
   ```



##########
crates/fluss/src/record/arrow.rs:
##########
@@ -809,6 +839,35 @@ impl ReadContext {
         };
         Ok(Some(record_batch))
     }
+
+    pub fn record_batch_for_remote_log(&self, data: &[u8]) -> 
Result<Option<RecordBatch>> {
+        let (batch_metadata, body_buffer, version) = match 
parse_ipc_message(data) {
+            Some(result) => result,
+            None => return Ok(None),
+        };
+
+        let record_batch = read_record_batch(
+            &body_buffer,
+            batch_metadata,
+            self.full_schema.clone(),
+            &std::collections::HashMap::new(),
+            None,
+            &version,
+        )?;
+
+        let record_batch = match &self.projection {
+            Some(projection) => {
+                let projected_columns: Vec<_> = projection
+                    .projected_fields
+                    .iter()
+                    .map(|&idx| record_batch.column(idx).clone())
+                    .collect();
+                RecordBatch::try_new(self.target_schema.clone(), 
projected_columns)?
+            }
+            None => record_batch,
+        };
+        Ok(Some(record_batch))

Review Comment:
   The comment describes client-side projection, but the implementation differs 
from the regular `record_batch()` method. The regular method applies 
ordered_schema when projection exists (line 811), while this method always uses 
full_schema (line 852). This inconsistency could lead to incorrect data parsing 
when projections are involved. Consider documenting why this difference is 
necessary or aligning the implementations if the difference is unintentional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to