Copilot commented on code in PR #76: URL: https://github.com/apache/fluss-rust/pull/76#discussion_r2596109925
########## crates/fluss/src/client/table/remote_log.rs: ########## @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{Error, Result}; +use crate::io::{FileIO, Storage}; +use crate::metadata::TableBucket; +use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; +use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; +use crate::util::delete_file; +use std::collections::HashMap; +use std::io; +use std::path::{Path, PathBuf}; +use tempfile::TempDir; +use tokio::io::AsyncWriteExt; +use tokio::sync::oneshot; + +/// Represents a remote log segment that needs to be downloaded +#[derive(Debug, Clone)] +pub struct RemoteLogSegment { + pub segment_id: String, + pub start_offset: i64, + #[allow(dead_code)] + pub end_offset: i64, + #[allow(dead_code)] + pub size_in_bytes: i32, + pub table_bucket: TableBucket, +} + +impl RemoteLogSegment { + pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self { + Self { + segment_id: segment.remote_log_segment_id.clone(), + start_offset: segment.remote_log_start_offset, + end_offset: segment.remote_log_end_offset, + size_in_bytes: segment.segment_size_in_bytes, + table_bucket, + } + } + + /// Get the local file name for this remote log segment + pub fn local_file_name(&self) -> String { + // Format: ${remote_segment_id}_${offset_prefix}.log + let offset_prefix = format!("{:020}", self.start_offset); + format!("{}_{}.log", self.segment_id, offset_prefix) + } +} + +/// Represents remote log fetch information +#[derive(Debug, Clone)] +pub struct RemoteLogFetchInfo { + pub remote_log_tablet_dir: String, + #[allow(dead_code)] + pub partition_name: Option<String>, + pub remote_log_segments: Vec<RemoteLogSegment>, + pub first_start_pos: i32, +} + +impl RemoteLogFetchInfo { + pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result<Self> { + let segments = info + .remote_log_segments + .iter() + .map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone())) + .collect(); + + Ok(Self { + remote_log_tablet_dir: info.remote_log_tablet_dir.clone(), + partition_name: info.partition_name.clone(), + remote_log_segments: segments, + first_start_pos: info.first_start_pos.unwrap_or(0), + }) + } +} + +/// Future for a remote log download request +pub struct RemoteLogDownloadFuture { + receiver: Option<oneshot::Receiver<Result<PathBuf>>>, +} + +impl RemoteLogDownloadFuture { + pub fn new(receiver: oneshot::Receiver<Result<PathBuf>>) -> Self { + Self { + receiver: Some(receiver), + } + } + + /// Get the downloaded file path + pub async fn get_file_path(&mut self) -> Result<PathBuf> { + let receiver = self + .receiver + .take() + .ok_or_else(|| Error::Io(io::Error::other("Download future already consumed")))?; + + receiver.await.map_err(|e| { + Error::Io(io::Error::other(format!( + "Download future cancelled: {e:?}" + ))) + })? + } +} + +/// Downloader for remote log segment files +pub struct RemoteLogDownloader { + local_log_dir: TempDir, +} + +impl RemoteLogDownloader { + pub fn new(local_log_dir: TempDir) -> Result<Self> { + Ok(Self { local_log_dir }) + } + + /// Request to fetch a remote log segment to local. This method is non-blocking. + pub fn request_remote_log( + &self, + remote_log_tablet_dir: &str, + segment: &RemoteLogSegment, + ) -> Result<RemoteLogDownloadFuture> { + let (sender, receiver) = oneshot::channel(); + let local_file_name = segment.local_file_name(); + let local_file_path = self.local_log_dir.path().join(&local_file_name); + let remote_path = self.build_remote_path(remote_log_tablet_dir, segment); + let remote_log_tablet_dir = remote_log_tablet_dir.to_string(); + // Spawn async download task + tokio::spawn(async move { + let result = + Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await; + let _ = sender.send(result); + }); + Ok(RemoteLogDownloadFuture::new(receiver)) + } + + /// Build the remote path for a log segment + fn build_remote_path(&self, remote_log_tablet_dir: &str, segment: &RemoteLogSegment) -> String { + // Format: ${remote_log_tablet_dir}/${segment_id}/${offset_prefix}.log + let offset_prefix = format!("{:020}", segment.start_offset); + format!( + "{}/{}/{}.log", + remote_log_tablet_dir, segment.segment_id, offset_prefix + ) + } + + /// Download a file from remote storage to local using streaming read/write + async fn download_file( + remote_log_tablet_dir: &str, + remote_path: &str, + local_path: &Path, + ) -> Result<PathBuf> { + // Handle both URL (e.g., "s3://bucket/path") and local file paths + // If the path doesn't contain "://", treat it as a local file path + let remote_log_tablet_dir_url = if remote_log_tablet_dir.contains("://") { + remote_log_tablet_dir.to_string() + } else { + format!("file://{remote_log_tablet_dir}") + }; + + // Create FileIO from the remote log tablet dir URL to get the storage + let file_io_builder = FileIO::from_url(&remote_log_tablet_dir_url)?; + + // Build storage and create operator directly + let storage = Storage::build(file_io_builder)?; + let (op, relative_path) = storage.create(remote_path)?; + + // Get file metadata to know the size + let meta = op.stat(relative_path).await?; + let file_size = meta.content_length(); + + // Create local file for writing + let mut local_file = tokio::fs::File::create(local_path).await?; + + // Stream data from remote to local file in chunks + // opendal::Reader::read accepts a range, so we read in chunks + const CHUNK_SIZE: u64 = 64 * 1024; // 64KB chunks for efficient streaming + let mut offset = 0u64; + + while offset < file_size { + let end = std::cmp::min(offset + CHUNK_SIZE, file_size); + let range = offset..end; + + // Read chunk from remote storage + let chunk = op.read_with(relative_path).range(range.clone()).await?; + let bytes = chunk.to_bytes(); + + // Write chunk to local file + local_file.write_all(&bytes).await?; + + offset = end; + } + + // Ensure all data is flushed to disk + local_file.sync_all().await?; + + Ok(local_path.to_path_buf()) + } Review Comment: If the download fails after the local file has been partially written, the file may be left in an incomplete state. Consider cleaning up the partially downloaded file in case of an error to avoid potential data corruption or disk space leaks. ########## crates/fluss/src/io/file_io.rs: ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use crate::error::*; +use std::collections::HashMap; +use std::ops::Range; +use std::sync::Arc; + +use bytes::Bytes; +use chrono::{DateTime, Utc}; +use opendal::Operator; + +use url::Url; + +use super::Storage; + +use crate::error::Result; + +#[derive(Clone, Debug)] +pub struct FileIO { + storage: Arc<Storage>, +} + +impl FileIO { + /// Try to infer file io scheme from path. + pub fn from_url(path: &str) -> Result<FileIOBuilder> { + let url = + Url::parse(path).map_err(|_| Error::IllegalArgument(format!("Invalid URL: {path}")))?; + Ok(FileIOBuilder::new(url.scheme())) + } + + /// Create a new input file to read data. + pub fn new_input(&self, path: &str) -> Result<InputFile> { + let (op, relative_path) = self.storage.create(path)?; + let path = path.to_string(); + let relative_path_pos = path.len() - relative_path.len(); + Ok(InputFile { + op, + path, + relative_path_pos, + }) + } +} Review Comment: The public `FileIO` struct and its methods lack documentation. As a public API, users need to understand what FileIO is for, how to use it, and what errors it might return. Consider adding doc comments explaining the purpose and usage of `from_url` and `new_input`. ########## crates/fluss/src/client/table/scanner.rs: ########## @@ -239,19 +249,75 @@ impl LogFetcher { let fetch_log_for_buckets = pb_fetch_log_resp.buckets_resp; for fetch_log_for_bucket in fetch_log_for_buckets { - let mut fetch_records = vec![]; let bucket: i32 = fetch_log_for_bucket.bucket_id; let table_bucket = TableBucket::new(table_id, bucket); - if fetch_log_for_bucket.records.is_some() { + + // Check if this is a remote log fetch + if let Some(ref remote_log_fetch_info) = + fetch_log_for_bucket.remote_log_fetch_info + { + let remote_fetch_info = RemoteLogFetchInfo::from_proto( + remote_log_fetch_info, + table_bucket.clone(), + )?; + + if let Some(fetch_offset) = + self.log_scanner_status.get_bucket_offset(&table_bucket) + { + let high_watermark = fetch_log_for_bucket.high_watermark.unwrap_or(-1); + // Download and process remote log segments + let mut pos_in_log_segment = remote_fetch_info.first_start_pos; + let mut current_fetch_offset = fetch_offset; + // todo: make segment download parallelly Review Comment: Comment about the spelling of "parallelly" - the correct adverb form is "in parallel" or "concurrently". ```suggestion // todo: make segment download in parallel ``` ########## crates/fluss/src/client/table/remote_log.rs: ########## @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{Error, Result}; +use crate::io::{FileIO, Storage}; +use crate::metadata::TableBucket; +use crate::proto::{PbRemoteLogFetchInfo, PbRemoteLogSegment}; +use crate::record::{LogRecordsBatchs, ReadContext, ScanRecord}; +use crate::util::delete_file; +use std::collections::HashMap; +use std::io; +use std::path::{Path, PathBuf}; +use tempfile::TempDir; +use tokio::io::AsyncWriteExt; +use tokio::sync::oneshot; + +/// Represents a remote log segment that needs to be downloaded +#[derive(Debug, Clone)] +pub struct RemoteLogSegment { + pub segment_id: String, + pub start_offset: i64, + #[allow(dead_code)] + pub end_offset: i64, + #[allow(dead_code)] + pub size_in_bytes: i32, + pub table_bucket: TableBucket, +} + +impl RemoteLogSegment { + pub fn from_proto(segment: &PbRemoteLogSegment, table_bucket: TableBucket) -> Self { + Self { + segment_id: segment.remote_log_segment_id.clone(), + start_offset: segment.remote_log_start_offset, + end_offset: segment.remote_log_end_offset, + size_in_bytes: segment.segment_size_in_bytes, + table_bucket, + } + } + + /// Get the local file name for this remote log segment + pub fn local_file_name(&self) -> String { + // Format: ${remote_segment_id}_${offset_prefix}.log + let offset_prefix = format!("{:020}", self.start_offset); + format!("{}_{}.log", self.segment_id, offset_prefix) + } +} + +/// Represents remote log fetch information +#[derive(Debug, Clone)] +pub struct RemoteLogFetchInfo { + pub remote_log_tablet_dir: String, + #[allow(dead_code)] + pub partition_name: Option<String>, + pub remote_log_segments: Vec<RemoteLogSegment>, + pub first_start_pos: i32, +} + +impl RemoteLogFetchInfo { + pub fn from_proto(info: &PbRemoteLogFetchInfo, table_bucket: TableBucket) -> Result<Self> { + let segments = info + .remote_log_segments + .iter() + .map(|s| RemoteLogSegment::from_proto(s, table_bucket.clone())) + .collect(); + + Ok(Self { + remote_log_tablet_dir: info.remote_log_tablet_dir.clone(), + partition_name: info.partition_name.clone(), + remote_log_segments: segments, + first_start_pos: info.first_start_pos.unwrap_or(0), + }) + } +} + +/// Future for a remote log download request +pub struct RemoteLogDownloadFuture { + receiver: Option<oneshot::Receiver<Result<PathBuf>>>, +} + +impl RemoteLogDownloadFuture { + pub fn new(receiver: oneshot::Receiver<Result<PathBuf>>) -> Self { + Self { + receiver: Some(receiver), + } + } + + /// Get the downloaded file path + pub async fn get_file_path(&mut self) -> Result<PathBuf> { + let receiver = self + .receiver + .take() + .ok_or_else(|| Error::Io(io::Error::other("Download future already consumed")))?; + + receiver.await.map_err(|e| { + Error::Io(io::Error::other(format!( + "Download future cancelled: {e:?}" + ))) + })? + } +} + +/// Downloader for remote log segment files +pub struct RemoteLogDownloader { + local_log_dir: TempDir, +} + +impl RemoteLogDownloader { + pub fn new(local_log_dir: TempDir) -> Result<Self> { + Ok(Self { local_log_dir }) + } + + /// Request to fetch a remote log segment to local. This method is non-blocking. + pub fn request_remote_log( + &self, + remote_log_tablet_dir: &str, + segment: &RemoteLogSegment, + ) -> Result<RemoteLogDownloadFuture> { + let (sender, receiver) = oneshot::channel(); + let local_file_name = segment.local_file_name(); + let local_file_path = self.local_log_dir.path().join(&local_file_name); + let remote_path = self.build_remote_path(remote_log_tablet_dir, segment); + let remote_log_tablet_dir = remote_log_tablet_dir.to_string(); + // Spawn async download task + tokio::spawn(async move { + let result = + Self::download_file(&remote_log_tablet_dir, &remote_path, &local_file_path).await; + let _ = sender.send(result); + }); + Ok(RemoteLogDownloadFuture::new(receiver)) + } Review Comment: The local file path for downloaded segments is based only on segment metadata, which could lead to race conditions if the same segment is downloaded multiple times concurrently. Consider adding a unique identifier (e.g., timestamp or random suffix) to the local file name to ensure uniqueness, or implement a caching mechanism to avoid redundant downloads. ########## crates/fluss/src/io/mod.rs: ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +mod file_io; + +pub use file_io::*; + +mod storage; +pub use storage::*; + +#[cfg(feature = "storage-fs")] +mod storage_fs; +#[cfg(feature = "storage-fs")] +use storage_fs::*; +#[cfg(feature = "storage-memory")] +mod storage_memory; + +#[cfg(feature = "storage-memory")] +use storage_memory::*; Review Comment: The `io` module is now public but lacks module-level documentation. Since this introduces new public API for file I/O operations with remote storage, it should have documentation explaining its purpose, basic usage examples, and supported storage backends. ########## crates/fluss/src/io/storage.rs: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use crate::error; +use crate::error::Result; +use crate::io::FileIOBuilder; +use opendal::{Operator, Scheme}; + +/// The storage carries all supported storage services in fluss +#[derive(Debug)] +pub enum Storage { + #[cfg(feature = "storage-memory")] + Memory, + #[cfg(feature = "storage-fs")] + LocalFs, +} + +impl Storage { + pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> { + let (scheme_str, _) = file_io_builder.into_parts(); + let scheme = Self::parse_scheme(&scheme_str)?; + + match scheme { + #[cfg(feature = "storage-memory")] + Scheme::Memory => Ok(Self::Memory), + #[cfg(feature = "storage-fs")] + Scheme::Fs => Ok(Self::LocalFs), + _ => Err(error::Error::IoUnsupported( + "Unsupported storage feature".to_string(), + )), + } + } + + pub(crate) fn create<'a>(&self, path: &'a str) -> Result<(Operator, &'a str)> { + match self { + #[cfg(feature = "storage-memory")] + Storage::Memory => { + let op = super::memory_config_build()?; + + if let Some(stripped) = path.strip_prefix("memory:/") { + Ok((op, stripped)) + } else { + Ok((op, &path[1..])) + } Review Comment: The path slicing logic assumes the path starts with a slash when no scheme prefix is found. This could panic if the path is empty or doesn't start with '/'. Consider adding validation or handling the case when `path.len() < 1`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
