Copilot commented on code in PR #76: URL: https://github.com/apache/fluss-rust/pull/76#discussion_r2595908447
########## crates/fluss/tests/integration/table_remote_scan.rs: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::integration::fluss_cluster::FlussTestingCluster; +use once_cell::sync::Lazy; +use parking_lot::RwLock; +use std::sync::Arc; + +#[cfg(test)] +use test_env_helpers::*; + +// Module-level shared cluster instance (only for this test file) +static SHARED_FLUSS_CLUSTER: Lazy<Arc<RwLock<Option<FlussTestingCluster>>>> = + Lazy::new(|| Arc::new(RwLock::new(None))); + +#[cfg(test)] +#[before_all] +#[after_all] +mod table_remote_scan_test { + use super::SHARED_FLUSS_CLUSTER; + use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; + use crate::integration::utils::create_table; + use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; + use fluss::row::{GenericRow, InternalRow}; + use std::collections::HashMap; + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; Review Comment: Unused imports: `AtomicUsize` and `Ordering` are imported but not used in this test module. Consider removing them to keep the code clean. ```suggestion ``` ########## crates/fluss/tests/integration/fluss_cluster.rs: ########## @@ -189,6 +225,10 @@ impl FlussTestingCluster { } self.coordinator_server.stop().await.unwrap(); self.zookeeper.stop().await.unwrap(); + if let Some(remote_data_dir) = &self.remote_data_dir { + std::fs::remove_dir_all(remote_data_dir) + .expect("Failed to delete remote data directory"); Review Comment: Blocking I/O in async function: `std::fs::remove_dir_all` is a blocking operation that can block the async runtime. Use `tokio::fs::remove_dir_all` instead for proper async cleanup. ########## crates/fluss/src/client/table/scanner.rs: ########## @@ -239,19 +252,73 @@ impl LogFetcher { let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; for fetch_log_for_bucket in fetch_log_for_buckets { - let mut fetch_records = vec![]; let bucket: i32 = fetch_log_for_bucket.bucket_id; let table_bucket = TableBucket::new(table_id, bucket); - if fetch_log_for_bucket.records.is_some() { + + // Check if this is a remote log fetch + if let Some(ref remote_log_fetch_info) = + fetch_log_for_bucket.remote_log_fetch_info + { + let remote_fetch_info = RemoteLogFetchInfo::from_proto( + remote_log_fetch_info, + table_bucket.clone(), + )?; + + if let Some(fetch_offset) = + self.log_scanner_status.get_bucket_offset(&table_bucket) + { + let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1); + // Download and process remote log segments + let mut pos_in_log_segment = remote_fetch_info.first_start_pos; + let mut current_fetch_offset = fetch_offset; + for (i, segment) in + remote_fetch_info.remote_log_segments.iter().enumerate() + { + if i > 0 { + pos_in_log_segment = 0; + current_fetch_offset = segment.start_offset; + } + + let download_future = + self.remote_log_downloader.request_remote_log( + &remote_fetch_info.remote_log_tablet_dir, + segment, + )?; + let pending_fetch = RemotePendingFetch::new( + segment.clone(), + download_future, + pos_in_log_segment, + current_fetch_offset, + high_watermark, + self.read_context.clone(), + ); + let remote_records = + pending_fetch.convert_to_completed_fetch().await?; + // Update offset and merge results + for (tb, records) in remote_records { + if let Some(last_record) = records.last() { + self.log_scanner_status + .update_offset(&tb, last_record.offset() + 1); + } + result.entry(tb).or_default().extend(records); + } Review Comment: [nitpick] Sequential download of remote segments can be inefficient when multiple segments need to be fetched. The current implementation processes segments sequentially (awaiting each download before starting the next). Consider downloading all segments concurrently and then processing them in order to improve performance, especially when there are multiple segments to fetch. ########## crates/fluss/src/io/mod.rs: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +pub use file_io::*; + +mod storage; +pub use storage::*; + +#[cfg(feature = "storage-fs")] +mod storage_fs; +#[cfg(feature = "storage-fs")] +use storage_fs::*; + +mod file_io; Review Comment: Module organization issue: The module declarations are inconsistent. `file_io` module is declared after being used on line 18. Move the `mod file_io;` declaration before `pub use file_io::*;` for better code organization and to follow Rust conventions. ########## crates/fluss/src/client/table/remote_log.rs: ########## @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{Error, Result}; +use crate::io::{FileIO, Storage}; +use crate::metadata::TableBucket; +use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; +use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; +use std::collections::HashMap; +use std::io; +use std::path::{Path, PathBuf}; +use tokio::io::AsyncWriteExt; +use tokio::sync::oneshot; +use tracing::warn; + +/// Represents a remote log segment that needs to be downloaded +#[derive(Debug, Clone)] +pub struct RemoteLogSegment { + pub segment_id: String, + pub start_offset: i64, + #[allow(dead_code)] + pub end_offset: i64, + #[allow(dead_code)] + pub size_in_bytes: i32, + pub table_bucket: TableBucket, +} + +impl RemoteLogSegment { + pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self { + Self { + segment_id: segment.remote_log_segment_id.clone(), + start_offset: segment.remote_log_start_offset, + end_offset: segment.remote_log_end_offset, + size_in_bytes: segment.segment_size_in_bytes, + table_bucket, + } + } + + /// Get the local file name for this remote log segment + pub fn local_file_name(&self) -> String { + // Format: ${remote_segment_id}_${offset_prefix}.log + let offset_prefix = format!("{:020}", self.start_offset); + format!("{}_{}.log", self.segment_id, offset_prefix) + } +} + +/// Represents remote log fetch information +#[derive(Debug, Clone)] +pub struct RemoteLogFetchInfo { + pub remote_log_tablet_dir: String, + #[allow(dead_code)] + pub partition_name: Option<String>, + pub remote_log_segments: Vec<RemoteLogSegment>, + pub first_start_pos: i32, +} + +impl RemoteLogFetchInfo { + pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result<Self> { + let segments = info + .remote_log_segments + .iter() + .map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone())) + .collect(); + + Ok(Self { + remote_log_tablet_dir: info.remote_log_tablet_dir.clone(), + partition_name: info.partition_name.clone(), + remote_log_segments: segments, + first_start_pos: info.first_start_pos.unwrap_or(0), + }) + } +} + +/// Future for a remote log download request +pub struct RemoteLogDownloadFuture { + receiver: Option<oneshot::Receiver<Result<PathBuf>>>, + recycle_callback: Option<Box<dyn FnOnce() + Send>>, +} + +impl RemoteLogDownloadFuture { + pub fn new( + receiver: oneshot::Receiver<Result<PathBuf>>, + recycle_callback: Box<dyn FnOnce() + Send>, + ) -> Self { + Self { + receiver: Some(receiver), + recycle_callback: Some(recycle_callback), + } + } + + /// Get the downloaded file path (blocking) + pub async fn get_file_path(&mut self) -> Result<PathBuf> { + let receiver = self + .receiver + .take() + .ok_or_else(|| Error::Io(io::Error::other("Download future already consumed")))?; + + receiver.await.map_err(|e| { + Error::Io(io::Error::other(format!( + "Download future cancelled: {e:?}" + ))) + })? + } + + /// Get the recycle callback + pub fn take_recycle_callback(&mut self) -> Option<Box<dyn FnOnce() + Send>> { + self.recycle_callback.take() + } +} + +/// Downloader for remote log segment files +pub struct RemoteLogDownloader { + local_log_dir: PathBuf, +} + +impl RemoteLogDownloader { + pub fn new<P: AsRef<Path>>(local_log_dir: P) -> Result<Self> { + let local_log_dir = local_log_dir.as_ref().to_path_buf(); + std::fs::create_dir_all(&local_log_dir)?; + + Ok(Self { local_log_dir }) + } + + /// Request to fetch a remote log segment to local. This method is non-blocking. + pub fn request_remote_log( + &self, + remote_log_tablet_dir: &str, + segment: &RemoteLogSegment, + ) -> Result<RemoteLogDownloadFuture> { + let (sender, receiver) = oneshot::channel(); + let local_file_name = segment.local_file_name(); + let local_file_path = self.local_log_dir.join(&local_file_name); + let remote_path = self.build_remote_path(remote_log_tablet_dir, segment); + let remote_log_tablet_dir = remote_log_tablet_dir.to_string(); + let recycle_callback: Box<dyn FnOnce() + Send> = Box::new({ + let local_file_path = local_file_path.clone(); + move || { + // Clean up the downloaded file + let _ = std::fs::remove_file(&local_file_path); + } + }); + + // Spawn async download task + tokio::spawn(async move { + let result = + Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await; + let _ = sender.send(result); + }); + + Ok(RemoteLogDownloadFuture::new(receiver, recycle_callback)) + } + + /// Build the remote path for a log segment + fn build_remote_path(&self, remote_log_tablet_dir: &str, segment: &RemoteLogSegment) -> String { + // Format: ${remote_log_tablet_dir}/${segment_id}/${offset_prefix}.log + let offset_prefix = format!("{:020}", segment.start_offset); + format!( + "{}/{}/{}.log", + remote_log_tablet_dir, segment.segment_id, offset_prefix + ) + } + + /// Download a file from remote storage to local using streaming read/write + async fn download_file( + remote_log_tablet_dir: &str, + remote_path: &str, + local_path: &Path, + ) -> Result<PathBuf> { + // Handle both URL (e.g., "s3://bucket/path") and local file paths + // If the path doesn't contain "://", treat it as a local file path + let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") { + remote_log_tablet_dir.to_string() + } else { + format!("file://{remote_log_tablet_dir}") + }; + + // Create FileIO from the remote log tablet dir URL to get the storage + let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?; + + // Build storage and create operator directly + let storage = Storage::build(file_io_builder)?; + let (op, relative_path) = storage.create(remote_path)?; + + // Get file metadata to know the size + let meta = op.stat(relative_path).await?; + let file_size = meta.content_length(); + + // Create local file for writing + let mut local_file = tokio::fs::File::create(local_path).await?; + + // Stream data from remote to local file in chunks + // opendal::Reader::read accepts a range, so we read in chunks + const CHUNK_SIZE: u64 = 64 * 1024; // 64KB chunks for efficient streaming + let mut offset = 0u64; + + while offset < file_size { + let end = std::cmp::min(offset + CHUNK_SIZE, file_size); + let range = offset..end; + + // Read chunk from remote storage + let chunk = op.read_with(relative_path).range(range.clone()).await?; + let bytes = chunk.to_bytes(); + + // Write chunk to local file + local_file.write_all(&bytes).await?; + + offset = end; + } + + // Ensure all data is flushed to disk + local_file.sync_all().await?; + + Ok(local_path.to_path_buf()) + } +} + +impl Drop for RemoteLogDownloader { + fn drop(&mut self) { + // Clean up the local log directory + if self.local_log_dir.exists() { + std::fs::remove_dir_all(&self.local_log_dir).unwrap_or_else(|_| { + warn!("Failed to delete local log dir: {:?}", &self.local_log_dir) + }); + } + } +} + +/// Pending fetch that waits for remote log file to be downloaded +pub struct RemotePendingFetch { + segment: RemoteLogSegment, + download_future: RemoteLogDownloadFuture, + pos_in_log_segment: i32, + #[allow(dead_code)] + fetch_offset: i64, + #[allow(dead_code)] + high_watermark: i64, + read_context: ReadContext, +} + +impl RemotePendingFetch { + pub fn new( + segment: RemoteLogSegment, + download_future: RemoteLogDownloadFuture, + pos_in_log_segment: i32, + fetch_offset: i64, + high_watermark: i64, + read_context: ReadContext, + ) -> Self { + Self { + segment, + download_future, + pos_in_log_segment, + fetch_offset, + high_watermark, + read_context, + } + } + + /// Convert to completed fetch by reading the downloaded file + pub async fn convert_to_completed_fetch( + mut self, + ) -> Result<HashMap<TableBucket, Vec<ScanRecord>>> { + let file_path = self.download_future.get_file_path().await?; Review Comment: Blocking I/O in async function: `std::fs::read` is a blocking operation that can block the async runtime. Use `tokio::fs::read` instead for better async performance. ########## crates/fluss/src/io/storage.rs: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::error; +use crate::error::Result; +use crate::io::FileIOBuilder; +use opendal::{Operator, Scheme}; + +/// The storage carries all supported storage services in paimon +#[derive(Debug)] +pub enum Storage { + #[cfg(feature = "storage-memory")] + Memory, + #[cfg(feature = "storage-fs")] + LocalFs, +} + +impl Storage { + pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> { + let (scheme_str, _) = file_io_builder.into_parts(); + let scheme = Self::parse_scheme(&scheme_str)?; + + match scheme { + #[cfg(feature = "storage-memory")] + Scheme::Memory => Ok(Self::Memory), + #[cfg(feature = "storage-fs")] + Scheme::Fs => Ok(Self::LocalFs), + _ => Err(error::Error::IoUnsupported( + "Unsupported storage feature".to_string(), + )), + } + } + + pub(crate) fn create<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { + match self { + #[cfg(feature = "storage-memory")] + Storage::Memory => { + let op = super::memory_config_build()?; + + if let Some(stripped) = path.strip_prefix("memory:/") { + Ok((op, stripped)) + } else { + Ok((op, &path[1..])) + } + } + #[cfg(feature = "storage-fs")] + Storage::LocalFs => { + let op = super::fs_config_build()?; + if let Some(stripped) = path.strip_prefix("file:/") { + Ok((op, stripped)) + } else { + Ok((op, &path[1..])) + } + } + } + } Review Comment: Compilation error with no default features: If neither `storage-memory` nor `storage-fs` features are enabled, the `Storage` enum will be empty, causing compilation errors. The `create` method's match expression will also have no arms, resulting in unreachable pattern warnings or errors. Either make at least one storage backend mandatory in the default features or add a compile-time check to prevent building without any storage backend. ########## crates/fluss/src/client/table/remote_log.rs: ########## @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{Error, Result}; +use crate::io::{FileIO, Storage}; +use crate::metadata::TableBucket; +use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; +use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; +use std::collections::HashMap; +use std::io; +use std::path::{Path, PathBuf}; +use tokio::io::AsyncWriteExt; +use tokio::sync::oneshot; +use tracing::warn; + +/// Represents a remote log segment that needs to be downloaded +#[derive(Debug, Clone)] +pub struct RemoteLogSegment { + pub segment_id: String, + pub start_offset: i64, + #[allow(dead_code)] + pub end_offset: i64, + #[allow(dead_code)] + pub size_in_bytes: i32, + pub table_bucket: TableBucket, +} + +impl RemoteLogSegment { + pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self { + Self { + segment_id: segment.remote_log_segment_id.clone(), + start_offset: segment.remote_log_start_offset, + end_offset: segment.remote_log_end_offset, + size_in_bytes: segment.segment_size_in_bytes, + table_bucket, + } + } + + /// Get the local file name for this remote log segment + pub fn local_file_name(&self) -> String { + // Format: ${remote_segment_id}_${offset_prefix}.log + let offset_prefix = format!("{:020}", self.start_offset); + format!("{}_{}.log", self.segment_id, offset_prefix) + } +} + +/// Represents remote log fetch information +#[derive(Debug, Clone)] +pub struct RemoteLogFetchInfo { + pub remote_log_tablet_dir: String, + #[allow(dead_code)] + pub partition_name: Option<String>, + pub remote_log_segments: Vec<RemoteLogSegment>, + pub first_start_pos: i32, +} + +impl RemoteLogFetchInfo { + pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result<Self> { + let segments = info + .remote_log_segments + .iter() + .map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone())) + .collect(); + + Ok(Self { + remote_log_tablet_dir: info.remote_log_tablet_dir.clone(), + partition_name: info.partition_name.clone(), + remote_log_segments: segments, + first_start_pos: info.first_start_pos.unwrap_or(0), + }) + } +} + +/// Future for a remote log download request +pub struct RemoteLogDownloadFuture { + receiver: Option<oneshot::Receiver<Result<PathBuf>>>, + recycle_callback: Option<Box<dyn FnOnce() + Send>>, +} + +impl RemoteLogDownloadFuture { + pub fn new( + receiver: oneshot::Receiver<Result<PathBuf>>, + recycle_callback: Box<dyn FnOnce() + Send>, + ) -> Self { + Self { + receiver: Some(receiver), + recycle_callback: Some(recycle_callback), + } + } + + /// Get the downloaded file path (blocking) + pub async fn get_file_path(&mut self) -> Result<PathBuf> { + let receiver = self + .receiver + .take() + .ok_or_else(|| Error::Io(io::Error::other("Download future already consumed")))?; + + receiver.await.map_err(|e| { + Error::Io(io::Error::other(format!( + "Download future cancelled: {e:?}" + ))) + })? + } + + /// Get the recycle callback + pub fn take_recycle_callback(&mut self) -> Option<Box<dyn FnOnce() + Send>> { + self.recycle_callback.take() + } +} + +/// Downloader for remote log segment files +pub struct RemoteLogDownloader { + local_log_dir: PathBuf, +} + +impl RemoteLogDownloader { + pub fn new<P: AsRef<Path>>(local_log_dir: P) -> Result<Self> { + let local_log_dir = local_log_dir.as_ref().to_path_buf(); + std::fs::create_dir_all(&local_log_dir)?; + + Ok(Self { local_log_dir }) + } + + /// Request to fetch a remote log segment to local. This method is non-blocking. + pub fn request_remote_log( + &self, + remote_log_tablet_dir: &str, + segment: &RemoteLogSegment, + ) -> Result<RemoteLogDownloadFuture> { + let (sender, receiver) = oneshot::channel(); + let local_file_name = segment.local_file_name(); + let local_file_path = self.local_log_dir.join(&local_file_name); + let remote_path = self.build_remote_path(remote_log_tablet_dir, segment); + let remote_log_tablet_dir = remote_log_tablet_dir.to_string(); + let recycle_callback: Box<dyn FnOnce() + Send> = Box::new({ + let local_file_path = local_file_path.clone(); + move || { + // Clean up the downloaded file + let _ = std::fs::remove_file(&local_file_path); + } + }); + + // Spawn async download task + tokio::spawn(async move { + let result = + Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await; + let _ = sender.send(result); + }); + + Ok(RemoteLogDownloadFuture::new(receiver, recycle_callback)) + } + + /// Build the remote path for a log segment + fn build_remote_path(&self, remote_log_tablet_dir: &str, segment: &RemoteLogSegment) -> String { + // Format: ${remote_log_tablet_dir}/${segment_id}/${offset_prefix}.log + let offset_prefix = format!("{:020}", segment.start_offset); + format!( + "{}/{}/{}.log", + remote_log_tablet_dir, segment.segment_id, offset_prefix + ) + } + + /// Download a file from remote storage to local using streaming read/write + async fn download_file( + remote_log_tablet_dir: &str, + remote_path: &str, + local_path: &Path, + ) -> Result<PathBuf> { + // Handle both URL (e.g., "s3://bucket/path") and local file paths + // If the path doesn't contain "://", treat it as a local file path + let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") { + remote_log_tablet_dir.to_string() + } else { + format!("file://{remote_log_tablet_dir}") + }; + + // Create FileIO from the remote log tablet dir URL to get the storage + let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?; + + // Build storage and create operator directly + let storage = Storage::build(file_io_builder)?; + let (op, relative_path) = storage.create(remote_path)?; + + // Get file metadata to know the size + let meta = op.stat(relative_path).await?; + let file_size = meta.content_length(); + + // Create local file for writing + let mut local_file = tokio::fs::File::create(local_path).await?; + + // Stream data from remote to local file in chunks + // opendal::Reader::read accepts a range, so we read in chunks + const CHUNK_SIZE: u64 = 64 * 1024; // 64KB chunks for efficient streaming + let mut offset = 0u64; + + while offset < file_size { + let end = std::cmp::min(offset + CHUNK_SIZE, file_size); + let range = offset..end; + + // Read chunk from remote storage + let chunk = op.read_with(relative_path).range(range.clone()).await?; + let bytes = chunk.to_bytes(); + + // Write chunk to local file + local_file.write_all(&bytes).await?; + + offset = end; + } + + // Ensure all data is flushed to disk + local_file.sync_all().await?; + + Ok(local_path.to_path_buf()) + } +} + +impl Drop for RemoteLogDownloader { + fn drop(&mut self) { + // Clean up the local log directory + if self.local_log_dir.exists() { + std::fs::remove_dir_all(&self.local_log_dir).unwrap_or_else(|_| { + warn!("Failed to delete local log dir: {:?}", &self.local_log_dir) + }); + } + } Review Comment: The `Drop` implementation uses blocking `std::fs::remove_dir_all` which can block the async runtime if the `RemoteLogDownloader` is dropped in an async context. Consider using `tokio::fs::remove_dir_all` with proper async cleanup, or document that this struct should not be dropped in async contexts. Alternatively, implement an explicit async cleanup method. ```suggestion /// Explicit async cleanup for RemoteLogDownloader. /// Must be called to remove the local log directory. /// This is async and should be awaited in async contexts. impl RemoteLogDownloader { pub async fn cleanup(&self) -> Result<()> { if self.local_log_dir.exists() { if let Err(e) = tokio::fs::remove_dir_all(&self.local_log_dir).await { warn!("Failed to delete local log dir: {:?}: {}", &self.local_log_dir, e); return Err(Error::from(e)); } } Ok(()) ``` ########## crates/fluss/tests/integration/table_remote_scan.rs: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::integration::fluss_cluster::FlussTestingCluster; +use once_cell::sync::Lazy; +use parking_lot::RwLock; +use std::sync::Arc; + +#[cfg(test)] +use test_env_helpers::*; + +// Module-level shared cluster instance (only for this test file) +static SHARED_FLUSS_CLUSTER: Lazy<Arc<RwLock<Option<FlussTestingCluster>>>> = + Lazy::new(|| Arc::new(RwLock::new(None))); + +#[cfg(test)] +#[before_all] +#[after_all] +mod table_remote_scan_test { + use super::SHARED_FLUSS_CLUSTER; + use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; + use crate::integration::utils::create_table; + use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; + use fluss::row::{GenericRow, InternalRow}; + use std::collections::HashMap; + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use std::thread; + use std::thread::sleep; + use std::time::Duration; + use uuid::Uuid; + + fn before_all() { + // Create a new tokio runtime in a separate thread + let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + rt.block_on(async { + // Create a temporary directory for remote data that can be accessed from both + // container and host. Use a fixed path so it's the same in container and host. + // On macOS, Docker Desktop may have issues with /tmp, so we use a path in the + // current working directory or user's home directory which Docker can access. + let temp_dir = std::env::current_dir() + .unwrap_or_else(|_| std::path::PathBuf::from(".")) + .join("target") + .join(format!("test-remote-data-{}", Uuid::new_v4())); + + // Remove existing directory if it exists to start fresh + let _ = std::fs::remove_dir_all(&temp_dir); + std::fs::create_dir_all(&temp_dir) + .expect("Failed to create temporary directory for remote data"); + println!("temp_dir: {:?}", temp_dir); + + // Verify directory was created and is accessible + if !temp_dir.exists() { + panic!("Remote data directory was not created: {:?}", temp_dir); + } + + // Get absolute path for Docker mount + let temp_dir = temp_dir + .canonicalize() + .expect("Failed to canonicalize remote data directory path"); + + let mut cluster_conf = HashMap::new(); + // set to a small size to make data can be tiered to remote + cluster_conf.insert("log.segment.file-size".to_string(), "120b".to_string()); + cluster_conf.insert( + "remote.log.task-interval-duration".to_string(), + "1s".to_string(), + ); + // remote.data.dir uses the same path in container and host + cluster_conf.insert( + "remote.data.dir".to_string(), + temp_dir.to_string_lossy().to_string(), + ); + + let cluster = + FlussTestingClusterBuilder::new_with_cluster_conf("test_table", &cluster_conf) + .with_remote_data_dir(temp_dir) + .build() + .await; + let mut guard = cluster_guard.write(); + *guard = Some(cluster); + }); + }) + .join() + .expect("Failed to create cluster"); + + // wait for 20 seconds to avoid the error like + // CoordinatorEventProcessor is not initialized yet + sleep(Duration::from_secs(20)); Review Comment: Hard-coded sleep duration of 20 seconds is a test smell that makes tests slow and potentially flaky. Consider implementing a proper polling mechanism with a timeout to wait for the CoordinatorEventProcessor to be initialized, or expose a health check endpoint that can be polled. ########## crates/fluss/src/io/file_io.rs: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::error::*; +use std::collections::HashMap; +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use opendal::Operator; + +use url::Url; + +use super::Storage; + +use crate::error::Result; + +#[derive(Clone, Debug)] +pub struct FileIO { + storage: Arc<Storage>, +} + +impl FileIO { + /// Try to infer file io scheme from path. + pub fn from_url(path: &str) -> Result<FileIOBuilder> { + let url = + Url::parse(path).map_err(|_| Error::IllegalArgument(format!("Invalid URL: {path}")))?; + Ok(FileIOBuilder::new(url.scheme())) + } + + /// Create a new input file to read data. + /// + /// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76> + pub fn new_input(&self, path: &str) -> Result<InputFile> { + let (op, relative_path) = self.storage.create(path)?; Review Comment: Documentation references Paimon instead of Fluss. This comment should be updated to reference Fluss-specific documentation or removed if not applicable. ```suggestion ``` ########## crates/fluss/src/io/file_io.rs: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::error::*; +use std::collections::HashMap; +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use opendal::Operator; + +use url::Url; + +use super::Storage; + +use crate::error::Result; + +#[derive(Clone, Debug)] +pub struct FileIO { + storage: Arc<Storage>, +} + +impl FileIO { + /// Try to infer file io scheme from path. + pub fn from_url(path: &str) -> Result<FileIOBuilder> { + let url = Review Comment: Documentation references "paimon-java's Options" but this is a Fluss project. This comment appears to be copied from Paimon and should be updated to reference Fluss-specific documentation or removed if not applicable. ```suggestion ``` ########## crates/fluss/src/error.rs: ########## @@ -47,4 +47,16 @@ pub enum Error { #[error("Illegal argument error: {0}")] IllegalArgument(String), + + #[error("IO not supported error: {0}")] + IoUnsupported(String), + + #[error("IO operation failed on underlying storage")] Review Comment: The error message for `IoUnexpected` does not include the underlying error details, making debugging difficult. Consider updating the error message to include the error details, e.g., `#[error("IO operation failed on underlying storage: {0}")]` and unwrap the Box in the display. ```suggestion #[error("IO operation failed on underlying storage: {0}")] ``` ########## crates/fluss/src/client/table/remote_log.rs: ########## @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{Error, Result}; +use crate::io::{FileIO, Storage}; +use crate::metadata::TableBucket; +use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; +use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; +use std::collections::HashMap; +use std::io; +use std::path::{Path, PathBuf}; +use tokio::io::AsyncWriteExt; +use tokio::sync::oneshot; +use tracing::warn; + +/// Represents a remote log segment that needs to be downloaded +#[derive(Debug, Clone)] +pub struct RemoteLogSegment { + pub segment_id: String, + pub start_offset: i64, + #[allow(dead_code)] + pub end_offset: i64, + #[allow(dead_code)] + pub size_in_bytes: i32, + pub table_bucket: TableBucket, +} + +impl RemoteLogSegment { + pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self { + Self { + segment_id: segment.remote_log_segment_id.clone(), + start_offset: segment.remote_log_start_offset, + end_offset: segment.remote_log_end_offset, + size_in_bytes: segment.segment_size_in_bytes, + table_bucket, + } + } + + /// Get the local file name for this remote log segment + pub fn local_file_name(&self) -> String { + // Format: ${remote_segment_id}_${offset_prefix}.log + let offset_prefix = format!("{:020}", self.start_offset); + format!("{}_{}.log", self.segment_id, offset_prefix) + } +} + +/// Represents remote log fetch information +#[derive(Debug, Clone)] +pub struct RemoteLogFetchInfo { + pub remote_log_tablet_dir: String, + #[allow(dead_code)] + pub partition_name: Option<String>, + pub remote_log_segments: Vec<RemoteLogSegment>, + pub first_start_pos: i32, +} + +impl RemoteLogFetchInfo { + pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result<Self> { + let segments = info + .remote_log_segments + .iter() + .map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone())) + .collect(); + + Ok(Self { + remote_log_tablet_dir: info.remote_log_tablet_dir.clone(), + partition_name: info.partition_name.clone(), + remote_log_segments: segments, + first_start_pos: info.first_start_pos.unwrap_or(0), + }) + } +} + +/// Future for a remote log download request +pub struct RemoteLogDownloadFuture { + receiver: Option<oneshot::Receiver<Result<PathBuf>>>, + recycle_callback: Option<Box<dyn FnOnce() + Send>>, +} + +impl RemoteLogDownloadFuture { + pub fn new( + receiver: oneshot::Receiver<Result<PathBuf>>, + recycle_callback: Box<dyn FnOnce() + Send>, + ) -> Self { + Self { + receiver: Some(receiver), + recycle_callback: Some(recycle_callback), + } + } + + /// Get the downloaded file path (blocking) + pub async fn get_file_path(&mut self) -> Result<PathBuf> { + let receiver = self + .receiver + .take() + .ok_or_else(|| Error::Io(io::Error::other("Download future already consumed")))?; + + receiver.await.map_err(|e| { + Error::Io(io::Error::other(format!( + "Download future cancelled: {e:?}" + ))) + })? + } + + /// Get the recycle callback + pub fn take_recycle_callback(&mut self) -> Option<Box<dyn FnOnce() + Send>> { + self.recycle_callback.take() + } +} + +/// Downloader for remote log segment files Review Comment: Potential file leak: If the download task fails or the `RemoteLogDownloadFuture` is dropped before `get_file_path()` is called, the downloaded file may not be cleaned up since the recycle callback won't be invoked. Consider implementing a `Drop` for `RemoteLogDownloadFuture` that calls the recycle callback if it hasn't been consumed yet. ```suggestion impl Drop for RemoteLogDownloadFuture { fn drop(&mut self) { if let Some(callback) = self.recycle_callback.take() { callback(); } } } ``` ########## crates/fluss/src/io/storage.rs: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::error; +use crate::error::Result; +use crate::io::FileIOBuilder; +use opendal::{Operator, Scheme}; + +/// The storage carries all supported storage services in paimon +#[derive(Debug)] +pub enum Storage { + #[cfg(feature = "storage-memory")] + Memory, + #[cfg(feature = "storage-fs")] + LocalFs, +} + +impl Storage { + pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> { + let (scheme_str, _) = file_io_builder.into_parts(); + let scheme = Self::parse_scheme(&scheme_str)?; + + match scheme { + #[cfg(feature = "storage-memory")] + Scheme::Memory => Ok(Self::Memory), + #[cfg(feature = "storage-fs")] + Scheme::Fs => Ok(Self::LocalFs), + _ => Err(error::Error::IoUnsupported( + "Unsupported storage feature".to_string(), + )), + } + } + + pub(crate) fn create<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { + match self { + #[cfg(feature = "storage-memory")] + Storage::Memory => { + let op = super::memory_config_build()?; + + if let Some(stripped) = path.strip_prefix("memory:/") { + Ok((op, stripped)) + } else { + Ok((op, &path[1..])) + } + } + #[cfg(feature = "storage-fs")] + Storage::LocalFs => { + let op = super::fs_config_build()?; + if let Some(stripped) = path.strip_prefix("file:/") { + Ok((op, stripped)) + } else { + Ok((op, &path[1..])) + } Review Comment: Potential panic on path slicing: Lines 57 and 66 use `&path[1..]` which will panic if the path is empty. Add a check to ensure the path has at least one character before slicing, or handle the error case appropriately. ########## crates/fluss/src/client/table/scanner.rs: ########## @@ -239,19 +252,73 @@ impl LogFetcher { let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; for fetch_log_for_bucket in fetch_log_for_buckets { - let mut fetch_records = vec![]; let bucket: i32 = fetch_log_for_bucket.bucket_id; let table_bucket = TableBucket::new(table_id, bucket); - if fetch_log_for_bucket.records.is_some() { + + // Check if this is a remote log fetch + if let Some(ref remote_log_fetch_info) = + fetch_log_for_bucket.remote_log_fetch_info + { + let remote_fetch_info = RemoteLogFetchInfo::from_proto( + remote_log_fetch_info, + table_bucket.clone(), + )?; + + if let Some(fetch_offset) = + self.log_scanner_status.get_bucket_offset(&table_bucket) + { + let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1); + // Download and process remote log segments + let mut pos_in_log_segment = remote_fetch_info.first_start_pos; + let mut current_fetch_offset = fetch_offset; + for (i, segment) in + remote_fetch_info.remote_log_segments.iter().enumerate() + { + if i > 0 { + pos_in_log_segment = 0; + current_fetch_offset = segment.start_offset; + } + + let download_future = + self.remote_log_downloader.request_remote_log( + &remote_fetch_info.remote_log_tablet_dir, + segment, + )?; + let pending_fetch = RemotePendingFetch::new( + segment.clone(), + download_future, + pos_in_log_segment, + current_fetch_offset, + high_watermark, + self.read_context.clone(), + ); + let remote_records = + pending_fetch.convert_to_completed_fetch().await?; + // Update offset and merge results + for (tb, records) in remote_records { + if let Some(last_record) = records.last() { + self.log_scanner_status + .update_offset(&tb, last_record.offset() + 1); + } + result.entry(tb).or_default().extend(records); + } + } + } else { Review Comment: Incomplete comment. The comment "// if the offset is null, it means the bucket has been unsubscribed," is missing proper termination and explanation. It should explain what happens after the bucket is unsubscribed (i.e., the code continues to the next iteration). ```suggestion // If the offset is None, it means the bucket has been unsubscribed; skip processing and continue to the next bucket. ``` ########## crates/fluss/src/client/table/remote_log.rs: ########## @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{Error, Result}; +use crate::io::{FileIO, Storage}; +use crate::metadata::TableBucket; +use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; +use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; +use std::collections::HashMap; +use std::io; +use std::path::{Path, PathBuf}; +use tokio::io::AsyncWriteExt; +use tokio::sync::oneshot; +use tracing::warn; + +/// Represents a remote log segment that needs to be downloaded +#[derive(Debug, Clone)] +pub struct RemoteLogSegment { + pub segment_id: String, + pub start_offset: i64, + #[allow(dead_code)] + pub end_offset: i64, + #[allow(dead_code)] + pub size_in_bytes: i32, + pub table_bucket: TableBucket, +} + +impl RemoteLogSegment { + pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self { + Self { + segment_id: segment.remote_log_segment_id.clone(), + start_offset: segment.remote_log_start_offset, + end_offset: segment.remote_log_end_offset, + size_in_bytes: segment.segment_size_in_bytes, + table_bucket, + } + } + + /// Get the local file name for this remote log segment + pub fn local_file_name(&self) -> String { + // Format: ${remote_segment_id}_${offset_prefix}.log + let offset_prefix = format!("{:020}", self.start_offset); + format!("{}_{}.log", self.segment_id, offset_prefix) + } +} + +/// Represents remote log fetch information +#[derive(Debug, Clone)] +pub struct RemoteLogFetchInfo { + pub remote_log_tablet_dir: String, + #[allow(dead_code)] + pub partition_name: Option<String>, + pub remote_log_segments: Vec<RemoteLogSegment>, + pub first_start_pos: i32, +} + +impl RemoteLogFetchInfo { + pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result<Self> { + let segments = info + .remote_log_segments + .iter() + .map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone())) + .collect(); + + Ok(Self { + remote_log_tablet_dir: info.remote_log_tablet_dir.clone(), + partition_name: info.partition_name.clone(), + remote_log_segments: segments, + first_start_pos: info.first_start_pos.unwrap_or(0), + }) + } +} + +/// Future for a remote log download request +pub struct RemoteLogDownloadFuture { + receiver: Option<oneshot::Receiver<Result<PathBuf>>>, + recycle_callback: Option<Box<dyn FnOnce() + Send>>, +} + +impl RemoteLogDownloadFuture { + pub fn new( + receiver: oneshot::Receiver<Result<PathBuf>>, + recycle_callback: Box<dyn FnOnce() + Send>, + ) -> Self { + Self { + receiver: Some(receiver), + recycle_callback: Some(recycle_callback), + } + } + + /// Get the downloaded file path (blocking) Review Comment: Inaccurate documentation: The comment states "(blocking)" but the function is actually `async` and non-blocking. The comment should be corrected or removed. ```suggestion /// Get the downloaded file path ``` ########## crates/fluss/src/io/storage.rs: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::error; +use crate::error::Result; +use crate::io::FileIOBuilder; +use opendal::{Operator, Scheme}; + +/// The storage carries all supported storage services in paimon Review Comment: The comment references "paimon" but this is a Fluss project. This appears to be copied from the Paimon codebase and should be updated to reference Fluss instead. ```suggestion /// The storage carries all supported storage services in Fluss ``` ########## crates/fluss/tests/integration/table_remote_scan.rs: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::integration::fluss_cluster::FlussTestingCluster; +use once_cell::sync::Lazy; +use parking_lot::RwLock; +use std::sync::Arc; + +#[cfg(test)] +use test_env_helpers::*; + +// Module-level shared cluster instance (only for this test file) +static SHARED_FLUSS_CLUSTER: Lazy<Arc<RwLock<Option<FlussTestingCluster>>>> = + Lazy::new(|| Arc::new(RwLock::new(None))); + +#[cfg(test)] +#[before_all] +#[after_all] +mod table_remote_scan_test { + use super::SHARED_FLUSS_CLUSTER; + use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; + use crate::integration::utils::create_table; + use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; + use fluss::row::{GenericRow, InternalRow}; + use std::collections::HashMap; + use std::sync::Arc; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use std::thread; + use std::thread::sleep; + use std::time::Duration; + use uuid::Uuid; + + fn before_all() { + // Create a new tokio runtime in a separate thread + let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + rt.block_on(async { + // Create a temporary directory for remote data that can be accessed from both + // container and host. Use a fixed path so it's the same in container and host. + // On macOS, Docker Desktop may have issues with /tmp, so we use a path in the + // current working directory or user's home directory which Docker can access. + let temp_dir = std::env::current_dir() + .unwrap_or_else(|_| std::path::PathBuf::from(".")) + .join("target") + .join(format!("test-remote-data-{}", Uuid::new_v4())); + + // Remove existing directory if it exists to start fresh + let _ = std::fs::remove_dir_all(&temp_dir); + std::fs::create_dir_all(&temp_dir) + .expect("Failed to create temporary directory for remote data"); + println!("temp_dir: {:?}", temp_dir); + + // Verify directory was created and is accessible + if !temp_dir.exists() { + panic!("Remote data directory was not created: {:?}", temp_dir); + } + + // Get absolute path for Docker mount + let temp_dir = temp_dir + .canonicalize() + .expect("Failed to canonicalize remote data directory path"); + + let mut cluster_conf = HashMap::new(); + // set to a small size to make data can be tiered to remote + cluster_conf.insert("log.segment.file-size".to_string(), "120b".to_string()); + cluster_conf.insert( + "remote.log.task-interval-duration".to_string(), + "1s".to_string(), + ); + // remote.data.dir uses the same path in container and host + cluster_conf.insert( + "remote.data.dir".to_string(), + temp_dir.to_string_lossy().to_string(), + ); + + let cluster = + FlussTestingClusterBuilder::new_with_cluster_conf("test_table", &cluster_conf) + .with_remote_data_dir(temp_dir) + .build() + .await; + let mut guard = cluster_guard.write(); + *guard = Some(cluster); + }); + }) + .join() + .expect("Failed to create cluster"); + + // wait for 20 seconds to avoid the error like + // CoordinatorEventProcessor is not initialized yet + sleep(Duration::from_secs(20)); + } + + fn after_all() { + // Create a new tokio runtime in a separate thread + let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + rt.block_on(async { + let mut guard = cluster_guard.write(); + if let Some(cluster) = guard.take() { + cluster.stop().await; + } + }); + }) + .join() + .expect("Failed to cleanup cluster"); + } + + #[tokio::test] + async fn test_scan_remote_log() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new( + "fluss".to_string(), + "test_append_record_batch_and_scan".to_string(), + ); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("c1", DataTypes::int()) + .column("c2", DataTypes::string()) + .build() + .expect("Failed to build schema"), + ) + .property("table.log.arrow.compression.type", "NONE") + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer(); + + // append 20 rows, there must be some tiered to remote + let record_count = 20; + for i in 0..record_count { + let mut row = GenericRow::new(); + row.set_field(0, i as i32); + let v = format!("v{}", i); + row.set_field(1, v.as_str()); + append_writer + .append(row) + .await + .expect("Failed to append row"); + } + + // Review Comment: Empty comment on line 174. Remove this line or add a meaningful comment explaining what follows. ```suggestion // Create a log scanner and subscribe to all buckets to read appended records ``` ########## crates/fluss/src/client/table/remote_log.rs: ########## @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{Error, Result}; +use crate::io::{FileIO, Storage}; +use crate::metadata::TableBucket; +use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; +use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; +use std::collections::HashMap; +use std::io; +use std::path::{Path, PathBuf}; +use tokio::io::AsyncWriteExt; +use tokio::sync::oneshot; +use tracing::warn; + +/// Represents a remote log segment that needs to be downloaded +#[derive(Debug, Clone)] +pub struct RemoteLogSegment { + pub segment_id: String, + pub start_offset: i64, + #[allow(dead_code)] + pub end_offset: i64, + #[allow(dead_code)] + pub size_in_bytes: i32, + pub table_bucket: TableBucket, +} + +impl RemoteLogSegment { + pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self { + Self { + segment_id: segment.remote_log_segment_id.clone(), + start_offset: segment.remote_log_start_offset, + end_offset: segment.remote_log_end_offset, + size_in_bytes: segment.segment_size_in_bytes, + table_bucket, + } + } + + /// Get the local file name for this remote log segment + pub fn local_file_name(&self) -> String { + // Format: ${remote_segment_id}_${offset_prefix}.log + let offset_prefix = format!("{:020}", self.start_offset); + format!("{}_{}.log", self.segment_id, offset_prefix) + } +} + +/// Represents remote log fetch information +#[derive(Debug, Clone)] +pub struct RemoteLogFetchInfo { + pub remote_log_tablet_dir: String, + #[allow(dead_code)] + pub partition_name: Option<String>, + pub remote_log_segments: Vec<RemoteLogSegment>, + pub first_start_pos: i32, +} + +impl RemoteLogFetchInfo { + pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result<Self> { + let segments = info + .remote_log_segments + .iter() + .map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone())) + .collect(); + + Ok(Self { + remote_log_tablet_dir: info.remote_log_tablet_dir.clone(), + partition_name: info.partition_name.clone(), + remote_log_segments: segments, + first_start_pos: info.first_start_pos.unwrap_or(0), + }) + } +} + +/// Future for a remote log download request +pub struct RemoteLogDownloadFuture { + receiver: Option<oneshot::Receiver<Result<PathBuf>>>, + recycle_callback: Option<Box<dyn FnOnce() + Send>>, +} + +impl RemoteLogDownloadFuture { + pub fn new( + receiver: oneshot::Receiver<Result<PathBuf>>, + recycle_callback: Box<dyn FnOnce() + Send>, + ) -> Self { + Self { + receiver: Some(receiver), + recycle_callback: Some(recycle_callback), + } + } + + /// Get the downloaded file path (blocking) + pub async fn get_file_path(&mut self) -> Result<PathBuf> { + let receiver = self + .receiver + .take() + .ok_or_else(|| Error::Io(io::Error::other("Download future already consumed")))?; + + receiver.await.map_err(|e| { + Error::Io(io::Error::other(format!( + "Download future cancelled: {e:?}" + ))) + })? + } + + /// Get the recycle callback + pub fn take_recycle_callback(&mut self) -> Option<Box<dyn FnOnce() + Send>> { + self.recycle_callback.take() + } +} + +/// Downloader for remote log segment files +pub struct RemoteLogDownloader { + local_log_dir: PathBuf, +} + +impl RemoteLogDownloader { + pub fn new<P: AsRef<Path>>(local_log_dir: P) -> Result<Self> { + let local_log_dir = local_log_dir.as_ref().to_path_buf(); + std::fs::create_dir_all(&local_log_dir)?; + + Ok(Self { local_log_dir }) + } + + /// Request to fetch a remote log segment to local. This method is non-blocking. + pub fn request_remote_log( + &self, + remote_log_tablet_dir: &str, + segment: &RemoteLogSegment, + ) -> Result<RemoteLogDownloadFuture> { + let (sender, receiver) = oneshot::channel(); + let local_file_name = segment.local_file_name(); + let local_file_path = self.local_log_dir.join(&local_file_name); + let remote_path = self.build_remote_path(remote_log_tablet_dir, segment); + let remote_log_tablet_dir = remote_log_tablet_dir.to_string(); + let recycle_callback: Box<dyn FnOnce() + Send> = Box::new({ + let local_file_path = local_file_path.clone(); + move || { + // Clean up the downloaded file + let _ = std::fs::remove_file(&local_file_path); + } + }); + + // Spawn async download task + tokio::spawn(async move { + let result = + Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await; + let _ = sender.send(result); + }); + + Ok(RemoteLogDownloadFuture::new(receiver, recycle_callback)) + } + + /// Build the remote path for a log segment + fn build_remote_path(&self, remote_log_tablet_dir: &str, segment: &RemoteLogSegment) -> String { + // Format: ${remote_log_tablet_dir}/${segment_id}/${offset_prefix}.log + let offset_prefix = format!("{:020}", segment.start_offset); + format!( + "{}/{}/{}.log", + remote_log_tablet_dir, segment.segment_id, offset_prefix + ) + } + + /// Download a file from remote storage to local using streaming read/write + async fn download_file( + remote_log_tablet_dir: &str, + remote_path: &str, + local_path: &Path, + ) -> Result<PathBuf> { + // Handle both URL (e.g., "s3://bucket/path") and local file paths + // If the path doesn't contain "://", treat it as a local file path + let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") { + remote_log_tablet_dir.to_string() + } else { + format!("file://{remote_log_tablet_dir}") + }; + + // Create FileIO from the remote log tablet dir URL to get the storage + let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?; + + // Build storage and create operator directly + let storage = Storage::build(file_io_builder)?; + let (op, relative_path) = storage.create(remote_path)?; + + // Get file metadata to know the size + let meta = op.stat(relative_path).await?; + let file_size = meta.content_length(); + + // Create local file for writing + let mut local_file = tokio::fs::File::create(local_path).await?; + + // Stream data from remote to local file in chunks + // opendal::Reader::read accepts a range, so we read in chunks + const CHUNK_SIZE: u64 = 64 * 1024; // 64KB chunks for efficient streaming + let mut offset = 0u64; + + while offset < file_size { + let end = std::cmp::min(offset + CHUNK_SIZE, file_size); + let range = offset..end; + + // Read chunk from remote storage + let chunk = op.read_with(relative_path).range(range.clone()).await?; + let bytes = chunk.to_bytes(); + + // Write chunk to local file + local_file.write_all(&bytes).await?; + + offset = end; + } + + // Ensure all data is flushed to disk + local_file.sync_all().await?; + + Ok(local_path.to_path_buf()) + } +} Review Comment: Race condition and file existence check missing: The `download_file` function doesn't check if the file already exists at `local_path` before downloading. If multiple concurrent requests are made for the same segment (e.g., due to retries), multiple downloads could occur simultaneously, potentially causing file corruption or wasted bandwidth. Consider checking if the file exists first, or use atomic file creation with appropriate locking. ########## crates/fluss/Cargo.toml: ########## @@ -45,16 +53,16 @@ ordered-float = { version = "4", features = ["serde"] } parse-display = "0.10" ref-cast = "1.0" chrono = { workspace = true } -oneshot = "0.1.11" +opendal = { version = "0.49", features = ["services-fs"] } Review Comment: The `opendal` dependency is declared with `features = ["services-fs"]` directly, but storage features are supposed to be conditionally enabled. This creates a conflict with the feature flags defined in lines 29-30. The dependency should either be declared without features (letting the feature flags control them) or should use a conditional dependency declaration. ```suggestion opendal = { version = "0.49" } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
