Copilot commented on code in PR #93:
URL: https://github.com/apache/fluss-rust/pull/93#discussion_r2616653506
##########
Cargo.toml:
##########
@@ -36,3 +36,8 @@ tokio = { version = "1.44.2", features = ["full"] }
clap = { version = "4.5.37", features = ["derive"] }
arrow = { version = "57.0.0", features = ["ipc_compression"] }
chrono = { version = "0.4", features = ["clock", "std", "wasmbind"] }
+
+serde = { version = "1.0", features = ["derive"] }
+serde_json = "1.0"
+opendal = { version = "0.53", features = ["services-s3"] }
Review Comment:
The version specified in Cargo.toml (0.53) doesn't match the version used in
the workspace (0.53.3 as seen in crates/fluss/Cargo.toml line 57). This
inconsistency could lead to dependency resolution issues. Consider using
workspace = true to inherit the version from the workspace root instead of
specifying it directly.
```suggestion
opendal = { version = "0.53.3", features = ["services-s3"] }
```
##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -184,13 +223,34 @@ impl RemoteLogDownloader {
// opendal::Reader::read accepts a range, so we read in chunks
const CHUNK_SIZE: u64 = 8 * 1024 * 1024; // 8MB chunks for efficient
reading
let mut offset = 0u64;
+ let mut chunk_count = 0u64;
+ let total_chunks = file_size.div_ceil(CHUNK_SIZE);
while offset < file_size {
let end = std::cmp::min(offset + CHUNK_SIZE, file_size);
let range = offset..end;
-
- // Read chunk from remote storage
- let chunk =
op.read_with(relative_path).range(range.clone()).await?;
+ chunk_count += 1;
+
+ if chunk_count <= 3 || chunk_count % 10 == 0 {
+ eprintln!(
+ "Remote log download: reading chunk {}/{} (offset {})",
+ chunk_count, total_chunks, offset
+ );
+ }
Review Comment:
Using `eprintln!` for logging download progress is not appropriate for
production code. This outputs directly to stderr and cannot be controlled or
filtered. Consider using the `tracing` crate (which is already a dependency of
this project) for proper structured logging. This would allow users to control
log levels and output destinations.
##########
crates/fluss/src/client/credentials.rs:
##########
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::metadata::Metadata;
+use crate::error::{Error, Result};
+use crate::rpc::RpcClient;
+use crate::rpc::message::GetSecurityTokenRequest;
+use parking_lot::RwLock;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+const CACHE_TTL: Duration = Duration::from_secs(3600);
+
+#[derive(Debug, Deserialize)]
+struct Credentials {
+ access_key_id: String,
+ access_key_secret: String,
+ security_token: Option<String>,
+}
+
+struct CachedToken {
+ access_key_id: String,
+ secret_access_key: String,
+ security_token: Option<String>,
+ addition_infos: HashMap<String, String>,
+ cached_at: Instant,
+}
+
+impl CachedToken {
+ fn to_s3_props(&self) -> HashMap<String, String> {
+ let mut props = HashMap::new();
+
+ props.insert("access_key_id".to_string(), self.access_key_id.clone());
+ props.insert(
+ "secret_access_key".to_string(),
+ self.secret_access_key.clone(),
+ );
+
+ if let Some(token) = &self.security_token {
+ props.insert("security_token".to_string(), token.clone());
+ }
+
+ for (key, value) in &self.addition_infos {
+ if let Some((opendal_key, transform)) =
convert_hadoop_key_to_opendal(key) {
+ let final_value = if transform {
+ // Invert boolean value (path_style_access ->
enable_virtual_host_style)
+ if value == "true" {
+ "false".to_string()
+ } else {
+ "true".to_string()
+ }
+ } else {
+ value.clone()
+ };
+ props.insert(opendal_key, final_value);
+ }
+ }
+
+ props
+ }
+}
+
+/// Returns (opendal_key, needs_inversion)
+/// needs_inversion is true for path_style_access -> enable_virtual_host_style
conversion
+fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> {
+ match hadoop_key {
+ "fs.s3a.endpoint" => Some(("endpoint".to_string(), false)),
+ "fs.s3a.endpoint.region" => Some(("region".to_string(), false)),
+ "fs.s3a.path.style.access" =>
Some(("enable_virtual_host_style".to_string(), true)),
+ "fs.s3a.connection.ssl.enabled" => None,
+ _ => None,
+ }
+}
+
+pub struct CredentialsCache {
+ inner: RwLock<Option<CachedToken>>,
+}
+
+impl CredentialsCache {
+ pub fn new() -> Self {
+ Self {
+ inner: RwLock::new(None),
+ }
+ }
+
+ pub async fn get_or_refresh(
+ &self,
+ rpc_client: &Arc<RpcClient>,
+ metadata: &Arc<Metadata>,
+ ) -> Result<HashMap<String, String>> {
+ {
+ let guard = self.inner.read();
+ if let Some(cached) = guard.as_ref() {
+ if cached.cached_at.elapsed() < CACHE_TTL {
+ return Ok(cached.to_s3_props());
+ }
+ }
+ }
+
+ self.refresh_from_server(rpc_client, metadata).await
+ }
+
+ async fn refresh_from_server(
+ &self,
+ rpc_client: &Arc<RpcClient>,
+ metadata: &Arc<Metadata>,
+ ) -> Result<HashMap<String, String>> {
+ let cluster = metadata.get_cluster();
+ let server_node = cluster
+ .get_coordinator_server()
+ .or_else(|| Some(cluster.get_one_available_server()))
+ .expect("no available server to fetch security token");
Review Comment:
Using `.expect()` with a panic message bypasses proper error handling. If no
server is available, this will cause a panic instead of returning a proper
error. Consider using `.ok_or_else()` to return an appropriate Error type
instead, allowing the caller to handle this failure gracefully.
##########
crates/fluss/src/proto/fluss_api.proto:
##########
@@ -297,4 +297,19 @@ message PbLakeSnapshotForBucket {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
optional int64 log_offset = 3;
+}
+
+message PbKeyValue {
+ required string key = 1;
+ required string value = 2;
+}
+
+message GetFileSystemSecurityTokenRequest {
+}
+
+message GetFileSystemSecurityTokenResponse {
+ required string schema = 1;
Review Comment:
The new `GetFileSystemSecurityTokenResponse` message returns a `token`
(parsed client-side as JSON with `access_key_id`/`access_key_secret`) over the
existing RPC layer, which currently uses a plain TCP `Transport::Plain` without
TLS or any encryption. Anyone with access to the network path between client
and server can sniff this RPC and steal cloud credentials, enabling full
compromise of the backing object store and its data. Protect this RPC with
transport-level encryption (for example by adding TLS to the RPC transport)
and/or change the design so that only short-lived, scoped tokens are issued and
transmitted instead of raw access keys.
```suggestion
// Response containing a short-lived, scoped security token for accessing
the file system.
// WARNING: Do NOT transmit raw access keys or long-lived credentials in the
token field.
// Only issue and transmit short-lived, scoped tokens (e.g., session tokens,
JWTs, or
// cloud provider temporary credentials) with minimal privileges and a short
expiration.
message GetFileSystemSecurityTokenResponse {
required string schema = 1;
// Short-lived, scoped security token (e.g., session token, JWT, or
temporary credentials).
// MUST NOT contain raw access keys or long-lived secrets.
```
##########
crates/fluss/src/client/credentials.rs:
##########
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::client::metadata::Metadata;
+use crate::error::{Error, Result};
+use crate::rpc::RpcClient;
+use crate::rpc::message::GetSecurityTokenRequest;
+use parking_lot::RwLock;
+use serde::Deserialize;
+use std::collections::HashMap;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+const CACHE_TTL: Duration = Duration::from_secs(3600);
+
+#[derive(Debug, Deserialize)]
+struct Credentials {
+ access_key_id: String,
+ access_key_secret: String,
+ security_token: Option<String>,
+}
+
+struct CachedToken {
+ access_key_id: String,
+ secret_access_key: String,
+ security_token: Option<String>,
+ addition_infos: HashMap<String, String>,
+ cached_at: Instant,
+}
+
+impl CachedToken {
+ fn to_s3_props(&self) -> HashMap<String, String> {
+ let mut props = HashMap::new();
+
+ props.insert("access_key_id".to_string(), self.access_key_id.clone());
+ props.insert(
+ "secret_access_key".to_string(),
+ self.secret_access_key.clone(),
+ );
+
+ if let Some(token) = &self.security_token {
+ props.insert("security_token".to_string(), token.clone());
+ }
+
+ for (key, value) in &self.addition_infos {
+ if let Some((opendal_key, transform)) =
convert_hadoop_key_to_opendal(key) {
+ let final_value = if transform {
+ // Invert boolean value (path_style_access ->
enable_virtual_host_style)
+ if value == "true" {
+ "false".to_string()
+ } else {
+ "true".to_string()
+ }
+ } else {
+ value.clone()
+ };
+ props.insert(opendal_key, final_value);
+ }
+ }
+
+ props
+ }
+}
+
+/// Returns (opendal_key, needs_inversion)
+/// needs_inversion is true for path_style_access -> enable_virtual_host_style
conversion
+fn convert_hadoop_key_to_opendal(hadoop_key: &str) -> Option<(String, bool)> {
+ match hadoop_key {
+ "fs.s3a.endpoint" => Some(("endpoint".to_string(), false)),
+ "fs.s3a.endpoint.region" => Some(("region".to_string(), false)),
+ "fs.s3a.path.style.access" =>
Some(("enable_virtual_host_style".to_string(), true)),
+ "fs.s3a.connection.ssl.enabled" => None,
+ _ => None,
+ }
+}
+
+pub struct CredentialsCache {
+ inner: RwLock<Option<CachedToken>>,
+}
+
+impl CredentialsCache {
+ pub fn new() -> Self {
+ Self {
+ inner: RwLock::new(None),
+ }
+ }
+
+ pub async fn get_or_refresh(
+ &self,
+ rpc_client: &Arc<RpcClient>,
+ metadata: &Arc<Metadata>,
+ ) -> Result<HashMap<String, String>> {
+ {
+ let guard = self.inner.read();
+ if let Some(cached) = guard.as_ref() {
+ if cached.cached_at.elapsed() < CACHE_TTL {
+ return Ok(cached.to_s3_props());
+ }
+ }
+ }
+
+ self.refresh_from_server(rpc_client, metadata).await
+ }
+
+ async fn refresh_from_server(
+ &self,
+ rpc_client: &Arc<RpcClient>,
+ metadata: &Arc<Metadata>,
+ ) -> Result<HashMap<String, String>> {
+ let cluster = metadata.get_cluster();
+ let server_node = cluster
+ .get_coordinator_server()
+ .or_else(|| Some(cluster.get_one_available_server()))
+ .expect("no available server to fetch security token");
+ let conn = rpc_client.get_connection(server_node).await?;
+
+ let request = GetSecurityTokenRequest::new();
+ let response = conn.request(request).await?;
+
+ let credentials: Credentials = serde_json::from_slice(&response.token)
+ .map_err(|e| Error::JsonSerdeError(e.to_string()))?;
+
+ let mut addition_infos = HashMap::new();
+ for kv in &response.addition_info {
+ addition_infos.insert(kv.key.clone(), kv.value.clone());
+ }
+
+ let cached = CachedToken {
+ access_key_id: credentials.access_key_id,
+ secret_access_key: credentials.access_key_secret,
+ security_token: credentials.security_token,
+ addition_infos,
+ cached_at: Instant::now(),
+ };
+
+ let props = cached.to_s3_props();
+ *self.inner.write() = Some(cached);
+
+ Ok(props)
+ }
Review Comment:
The credentials cache has a fixed TTL of 1 hour (3600 seconds), but the
response from the server includes an optional `expiration_time` field (see
fluss_api.proto line 313). Consider using the server-provided expiration time
when available instead of using a fixed duration, as the server may issue
credentials with different lifetimes based on security policies.
##########
crates/fluss/src/client/table/remote_log.rs:
##########
@@ -169,12 +185,35 @@ impl RemoteLogDownloader {
// Create FileIO from the remote log tablet dir URL to get the storage
let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?;
+ // For S3/S3A URLs, inject S3 credentials from props
+ let file_io_builder = if remote_log_tablet_dir.starts_with("s3://")
+ || remote_log_tablet_dir.starts_with("s3a://")
+ {
+ file_io_builder.with_props(s3_props.iter().map(|(k, v)|
(k.as_str(), v.as_str())))
+ } else {
+ file_io_builder
+ };
+
// Build storage and create operator directly
let storage = Storage::build(file_io_builder)?;
let (op, relative_path) = storage.create(remote_path)?;
- // Get file metadata to know the size
- let meta = op.stat(relative_path).await?;
+ // Timeout for remote storage operations (30 seconds)
+ const REMOTE_OP_TIMEOUT: std::time::Duration =
std::time::Duration::from_secs(30);
Review Comment:
The hardcoded timeout constant is defined locally in the function instead of
at the module level or as a configurable constant. Consider moving this to a
module-level constant for better maintainability and consistency with the
pattern used in storage_s3.rs (lines 32-33) where timeouts are defined at the
module level.
```suggestion
```
##########
crates/fluss/src/record/arrow.rs:
##########
@@ -809,6 +839,35 @@ impl ReadContext {
};
Ok(Some(record_batch))
}
+
+ pub fn record_batch_for_remote_log(&self, data: &[u8]) ->
Result<Option<RecordBatch>> {
+ let (batch_metadata, body_buffer, version) = match
parse_ipc_message(data) {
+ Some(result) => result,
+ None => return Ok(None),
+ };
+
+ let record_batch = read_record_batch(
+ &body_buffer,
+ batch_metadata,
+ self.full_schema.clone(),
+ &std::collections::HashMap::new(),
+ None,
+ &version,
+ )?;
+
+ let record_batch = match &self.projection {
+ Some(projection) => {
+ let projected_columns: Vec<_> = projection
+ .projected_fields
+ .iter()
+ .map(|&idx| record_batch.column(idx).clone())
+ .collect();
+ RecordBatch::try_new(self.target_schema.clone(),
projected_columns)?
+ }
+ None => record_batch,
+ };
+ Ok(Some(record_batch))
Review Comment:
The comment describes client-side projection, but the implementation differs
from the regular `record_batch()` method. The regular method applies
ordered_schema when projection exists (line 811), while this method always uses
full_schema (line 852). This inconsistency could lead to incorrect data parsing
when projections are involved. Consider documenting why this difference is
necessary or aligning the implementations if the difference is unintentional.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]