Copilot commented on code in PR #501: URL: https://github.com/apache/hudi-rs/pull/501#discussion_r2648544502
########## crates/core/src/timeline/completion_time.rs: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +//! Completion time query view for looking up completion timestamps from request timestamps. +//! +//! This module provides the [`TimelineViewByCompletionTime`] trait and implementations +//! for mapping request timestamps to completion timestamps. This is essential for +//! v8+ tables where file names contain request timestamps but file ordering and +//! association must be based on completion timestamps. +//! +//! # Table Version Differences +//! +//! - **v6 tables**: Completion timestamps are not tracked. Use [`V6CompletionTimeView`] +//! which returns `None` for all lookups (completion_timestamp remains unset). +//! +//! - **v8+ tables**: Request and completion timestamps are different. The completion timestamp +//! is stored in the timeline instant filename as `{request}_{completion}.{action}`. +//! Use [`CompletionTimeView`] to look up completion timestamps from a map. + +use crate::timeline::instant::Instant; +use std::collections::HashMap; + +/// A view for querying completion timestamps from request timestamps. +/// +/// This trait abstracts the completion time lookup logic to support both +/// v6 tables (where completion time is not tracked) and v8+ tables (where +/// request and completion timestamps differ). +pub trait TimelineViewByCompletionTime { + /// Get the completion timestamp for a given request timestamp. + /// + /// Returns `Some(completion_timestamp)` if the request timestamp corresponds + /// to a completed commit, `None` if the commit is pending/unknown or if + /// completion time is not tracked (v6 tables). + fn get_completion_time(&self, request_timestamp: &str) -> Option<&str>; + + /// Returns true if uncommitted files should be filtered out. + /// + /// For v8+ tables, this returns true because we track completion times + /// and can distinguish committed from uncommitted files. + /// For v6 tables, this returns false because completion time is not tracked. + fn should_filter_uncommitted(&self) -> bool { + false + } +} + +/// Completion time view for v6 tables. +/// +/// In v6, completion time is not tracked separately from request time. +/// This view always returns `None`, meaning the caller should use the +/// request timestamp directly for any completion-time-based logic. +#[derive(Debug, Default)] +pub struct V6CompletionTimeView; + +impl V6CompletionTimeView { + pub fn new() -> Self { + Self + } +} + +impl TimelineViewByCompletionTime for V6CompletionTimeView { + fn get_completion_time(&self, _request_timestamp: &str) -> Option<&str> { + None + } +} + +/// Completion time view for v8+ tables backed by a HashMap. +/// +/// This view is built from timeline instants and maps request timestamps +/// to their corresponding completion timestamps. +#[derive(Debug)] +pub struct CompletionTimeView { + /// Map from request timestamp to completion timestamp + request_to_completion: HashMap<String, String>, +} + +impl CompletionTimeView { + /// Create a new completion time view from timeline instants. + /// + /// Only completed instants with a completion timestamp are included. + pub fn from_instants<'a, I>(instants: I) -> Self + where + I: IntoIterator<Item = &'a Instant>, + { + let request_to_completion = instants + .into_iter() + .filter_map(|instant| { + instant + .completed_timestamp + .as_ref() + .map(|completion_ts| (instant.timestamp.clone(), completion_ts.clone())) + }) + .collect(); + + Self { + request_to_completion, + } + } + + /// Create a new empty completion time view. + pub fn empty() -> Self { + Self { + request_to_completion: HashMap::new(), + } + } + + /// Check if this view has any completion time mappings. + pub fn is_empty(&self) -> bool { + self.request_to_completion.is_empty() + } + + /// Get the number of completion time mappings. + pub fn len(&self) -> usize { + self.request_to_completion.len() + } +} + +impl TimelineViewByCompletionTime for CompletionTimeView { + fn get_completion_time(&self, request_timestamp: &str) -> Option<&str> { + self.request_to_completion + .get(request_timestamp) + .map(|s| s.as_str()) + } + + fn should_filter_uncommitted(&self) -> bool { + // Only filter if we have completion time data (v8+ table) + !self.is_empty() + } +} Review Comment: The test coverage for `should_filter_uncommitted()` is missing. This is a critical method that determines whether uncommitted files should be filtered out. Consider adding tests that verify: 1. V6CompletionTimeView returns false (v6 tables don't filter uncommitted files) 2. Empty CompletionTimeView returns false 3. Non-empty CompletionTimeView returns true ########## crates/core/src/table/mod.rs: ########## @@ -533,18 +559,81 @@ impl Table { timestamp: &str, filters: &[Filter], ) -> Result<Vec<FileSlice>> { + // Create completion time view for setting completion timestamps on files. + // For v8+ tables, this populates completion timestamps and enables filtering. + // For v6 tables, the view is empty and acts as a no-op. + let completion_time_view = self.timeline.create_completion_time_view(); + + // File slices are keyed by commit_timestamp (request/base instant time). let excludes = self .timeline .get_replaced_file_groups_as_of(timestamp) .await?; let partition_schema = self.get_partition_schema().await?; let partition_pruner = PartitionPruner::new(filters, &partition_schema, self.hudi_configs.as_ref())?; + + // Try metadata table accelerated listing if files partition is configured + let metadata_table_records = if self.is_metadata_table_enabled() { + match self.fetch_metadata_table_records_if_valid(timestamp).await { + Ok(records) => Some(records), + Err(e) => { + // Fall through to storage listing if metadata table read fails + log::warn!( + "Failed to read file slices from metadata table, falling back to storage listing: {}", + e + ); + None + } + } + } else { + None + }; + + // Use the single entrypoint that handles both metadata table and storage listing self.file_system_view - .get_file_slices_as_of(timestamp, &partition_pruner, &excludes) + .get_file_slices_as_of( + timestamp, + &partition_pruner, + &excludes, + metadata_table_records.as_ref(), + Some(&completion_time_view), + ) .await } + /// Fetch metadata table records if the metadata table is up-to-date for the query timestamp. + /// + /// Returns an error if the metadata table is behind the query timestamp, indicating + /// that the caller should fall back to storage listing. + async fn fetch_metadata_table_records_if_valid( + &self, + timestamp: &str, + ) -> Result<HashMap<String, FilesPartitionRecord>> { + let metadata_table = self.new_metadata_table().await?; + + // Check if metadata table is up-to-date enough for the query timestamp. + // If the metadata table's latest commit is before the query timestamp, it might + // be missing files from later commits, so we should fall back to storage. Review Comment: The comment on line 616 says "If the metadata table's latest commit is before the query timestamp" but the condition on line 622 checks `latest_ts >= timestamp`, which means metadata table timestamp is greater than or equal to query timestamp. This is correct logic (metadata table should be at least as current as the query), but the comment is misleading. The comment should say "If the metadata table's latest commit is not at least as recent as the query timestamp" or "If the query timestamp is after the metadata table's latest commit". ```suggestion // If the metadata table's latest commit is not at least as recent as the query // timestamp, it might be missing files from later commits, so we should fall back to storage. ``` ########## crates/core/src/file_group/mod.rs: ########## @@ -170,28 +175,50 @@ impl FileGroup { /// Add a [LogFile] to the corresponding [FileSlice] in the [FileGroup]. /// - /// For table version 6 (pre-1.0 spec): - /// Log file's timestamp matches the base file's commit timestamp exactly. + /// File slices are keyed by `commit_timestamp` (request/base instant time). + /// The log file association logic: /// - /// For table version 8+ (1.0 spec): - /// Log file's timestamp is its own write instant, which may differ from the base file's - /// timestamp. We find the FileSlice with the closest timestamp <= log file's timestamp. + /// - **With completion_timestamp (v8+ tables)**: Find the file slice with the largest + /// `commit_timestamp` (base instant time) that is <= log's `completion_timestamp`. + /// + /// - **Without completion_timestamp (v6 tables)**: Use exact matching or range lookup + /// based on log timestamp. /// /// TODO: support adding log files to file group without base files. pub fn add_log_file(&mut self, log_file: LogFile) -> Result<&Self> { - let log_timestamp = log_file.timestamp.as_str(); + // If log file has completion_timestamp, use completion-time-based association + // File slices are keyed by commit_timestamp (base instant time) + // Find the largest base instant time < log's completion time + if let Some(log_completion_time) = &log_file.completion_timestamp { + // Find file slice with largest base instant time + // (commit_timestamp) < log's completion time Review Comment: The documentation comment on line 182 states to find file slices with commit_timestamp "<= log's completion_timestamp", but the comment on line 191 says "Find the largest base instant time < log's completion time" (note the strict less-than). There's an inconsistency between these two comments. The code implementation uses exclusive range `..` (strict less-than), which matches line 191 but contradicts line 182. Please clarify the intended behavior and ensure the documentation is consistent. ########## crates/core/src/table/mod.rs: ########## @@ -237,6 +239,30 @@ impl Table { .into() } + /// Check if the metadata table "files" partition is enabled for file listing. + /// + /// Returns `true` if: + /// 1. `hoodie.metadata.enable` is true + /// 2. Table version is >= 8 (MDT support is only for v8+ tables) + /// 3. "files" is in the configured `hoodie.table.metadata.partitions` + pub fn is_metadata_table_enabled(&self) -> bool { + let metadata_enabled: bool = self + .hudi_configs + .get_or_default(MetadataTableEnabled) + .into(); + // TODO: remove this table version check when support files for v6 tables or drop v6 support + let table_version: isize = self + .hudi_configs + .get(TableVersion) + .map(|v| v.into()) + .unwrap_or(0); + metadata_enabled + && table_version >= 8 + && self + .get_metadata_table_partitions() + .contains(&FilesPartitionRecord::PARTITION_NAME.to_string()) + } Review Comment: The function name `is_metadata_table_enabled` might be misleading. This function checks if the metadata table *files partition* is enabled and ready for use, not just whether the metadata table feature is enabled. The function checks three conditions including table version and partition configuration. Consider renaming to `is_metadata_files_partition_enabled` or `can_use_metadata_table_for_files` to be more specific about what it checks. ########## crates/core/src/file_group/mod.rs: ########## @@ -170,28 +175,50 @@ impl FileGroup { /// Add a [LogFile] to the corresponding [FileSlice] in the [FileGroup]. /// - /// For table version 6 (pre-1.0 spec): - /// Log file's timestamp matches the base file's commit timestamp exactly. + /// File slices are keyed by `commit_timestamp` (request/base instant time). + /// The log file association logic: /// - /// For table version 8+ (1.0 spec): - /// Log file's timestamp is its own write instant, which may differ from the base file's - /// timestamp. We find the FileSlice with the closest timestamp <= log file's timestamp. + /// - **With completion_timestamp (v8+ tables)**: Find the file slice with the largest + /// `commit_timestamp` (base instant time) that is <= log's `completion_timestamp`. + /// + /// - **Without completion_timestamp (v6 tables)**: Use exact matching or range lookup + /// based on log timestamp. /// /// TODO: support adding log files to file group without base files. pub fn add_log_file(&mut self, log_file: LogFile) -> Result<&Self> { - let log_timestamp = log_file.timestamp.as_str(); + // If log file has completion_timestamp, use completion-time-based association + // File slices are keyed by commit_timestamp (base instant time) + // Find the largest base instant time < log's completion time + if let Some(log_completion_time) = &log_file.completion_timestamp { + // Find file slice with largest base instant time + // (commit_timestamp) < log's completion time + if let Some((_, file_slice)) = self + .file_slices + .range_mut(..log_completion_time.clone()) Review Comment: The range query uses `..log_completion_time.clone()` which is exclusive of the upper bound. According to the documentation comment on line 182, this should find file slices with `commit_timestamp` that is "<= log's completion_timestamp", but the exclusive range only includes values strictly less than the log's completion time. This means if a base file's commit_timestamp equals the log file's completion_timestamp, they won't be associated, which could be incorrect. Consider using an inclusive range `..=` instead of exclusive `..`. ```suggestion // Find the largest base instant time <= log's completion time if let Some(log_completion_time) = &log_file.completion_timestamp { // Find file slice with largest base instant time // (commit_timestamp) <= log's completion time if let Some((_, file_slice)) = self .file_slices .range_mut(..=log_completion_time.clone()) ``` ########## crates/core/src/file_group/builder.rs: ########## @@ -113,6 +136,109 @@ pub fn build_replaced_file_groups( Ok(file_groups) } +/// Build FileGroups from metadata table FilesPartitionRecords. +/// +/// Parses file names from metadata table records and constructs FileGroup objects. +/// Only includes active (non-deleted) files. +/// +/// # Arguments +/// * `records` - Metadata table files partition records (partition_path -> FilesPartitionRecord) +/// * `base_file_extension` - The base file format extension (e.g., "parquet", "hfile") +/// * `completion_time_view` - Optional view to look up completion timestamps. +/// - For v8+ tables: Pass a view to set completion timestamps on files +/// - For v6 tables: Pass None (v6 does not track completion timestamps) +/// +/// # Returns +/// A map of partition paths to their FileGroups. +pub fn file_groups_from_files_partition_records<V: TimelineViewByCompletionTime>( + records: &HashMap<String, FilesPartitionRecord>, + base_file_extension: &str, + completion_time_view: Option<&V>, +) -> Result<DashMap<String, Vec<FileGroup>>> { + let file_groups_map = DashMap::new(); + let base_file_suffix = format!(".{}", base_file_extension); + + for (partition_path, record) in records { + // Skip __all_partitions__ record - it lists partition names, not files + if record.is_all_partitions() { + continue; + } + + let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> = HashMap::new(); + let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> = HashMap::new(); + + for file_name in record.active_file_names() { + if file_name.starts_with('.') { + // Log file: starts with '.' + match LogFile::from_str(file_name) { + Ok(mut log_file) => { + // Look up completion timestamp from the view if provided + if let Some(view) = completion_time_view { + log_file.set_completion_time(view); + } + file_id_to_log_files + .entry(log_file.file_id.clone()) + .or_default() + .push(log_file); + } + Err(e) => { + // Skip files that can't be parsed (e.g., .cdc files not yet supported) + log::warn!( + "Failed to parse log file from metadata table: {}: {}", + file_name, + e + ); + } + } + } else if file_name.ends_with(&base_file_suffix) { + // Base file: ends with base file extension + match BaseFile::from_str(file_name) { + Ok(mut base_file) => { + // Look up completion timestamp from the view if provided + if let Some(view) = completion_time_view { + base_file.set_completion_time(view); + } + file_id_to_base_files + .entry(base_file.file_id.clone()) + .or_default() + .push(base_file); + } + Err(e) => { + log::warn!( + "Failed to parse base file from metadata table: {}: {}", + file_name, + e + ); + } + } + } + // Skip files with unrecognized extensions + } + + // Build FileGroups from parsed files + // Note: Currently only supports file groups with base files. + // TODO: Support file groups with only log files (P1 task) + let mut file_groups = Vec::new(); + for (file_id, base_files) in file_id_to_base_files { + let mut fg = FileGroup::new(file_id.clone(), partition_path.clone()); + fg.add_base_files(base_files)?; + + // Add corresponding log files if any + if let Some(log_files) = file_id_to_log_files.remove(&file_id) { + fg.add_log_files(log_files)?; + } + + file_groups.push(fg); + } + + if !file_groups.is_empty() { + file_groups_map.insert(partition_path.clone(), file_groups); + } + } + + Ok(file_groups_map) +} Review Comment: The new function `file_groups_from_files_partition_records` (lines 153-240) lacks test coverage. This is a critical function that converts metadata table records to file groups with completion timestamp handling. Consider adding tests that verify: 1. Parsing base files and log files from metadata table records 2. Setting completion timestamps correctly via the completion_time_view 3. Filtering uncommitted files when appropriate 4. Handling the __all_partitions__ record correctly 5. Handling parse errors gracefully ########## crates/core/src/file_group/builder.rs: ########## @@ -113,6 +136,109 @@ pub fn build_replaced_file_groups( Ok(file_groups) } +/// Build FileGroups from metadata table FilesPartitionRecords. +/// +/// Parses file names from metadata table records and constructs FileGroup objects. +/// Only includes active (non-deleted) files. +/// +/// # Arguments +/// * `records` - Metadata table files partition records (partition_path -> FilesPartitionRecord) +/// * `base_file_extension` - The base file format extension (e.g., "parquet", "hfile") +/// * `completion_time_view` - Optional view to look up completion timestamps. +/// - For v8+ tables: Pass a view to set completion timestamps on files +/// - For v6 tables: Pass None (v6 does not track completion timestamps) +/// +/// # Returns +/// A map of partition paths to their FileGroups. +pub fn file_groups_from_files_partition_records<V: TimelineViewByCompletionTime>( + records: &HashMap<String, FilesPartitionRecord>, + base_file_extension: &str, + completion_time_view: Option<&V>, +) -> Result<DashMap<String, Vec<FileGroup>>> { + let file_groups_map = DashMap::new(); + let base_file_suffix = format!(".{}", base_file_extension); + + for (partition_path, record) in records { + // Skip __all_partitions__ record - it lists partition names, not files + if record.is_all_partitions() { + continue; + } + + let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> = HashMap::new(); + let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> = HashMap::new(); + + for file_name in record.active_file_names() { + if file_name.starts_with('.') { + // Log file: starts with '.' + match LogFile::from_str(file_name) { + Ok(mut log_file) => { + // Look up completion timestamp from the view if provided + if let Some(view) = completion_time_view { + log_file.set_completion_time(view); + } + file_id_to_log_files + .entry(log_file.file_id.clone()) + .or_default() + .push(log_file); + } + Err(e) => { + // Skip files that can't be parsed (e.g., .cdc files not yet supported) + log::warn!( + "Failed to parse log file from metadata table: {}: {}", + file_name, + e + ); + } + } + } else if file_name.ends_with(&base_file_suffix) { + // Base file: ends with base file extension + match BaseFile::from_str(file_name) { + Ok(mut base_file) => { + // Look up completion timestamp from the view if provided + if let Some(view) = completion_time_view { + base_file.set_completion_time(view); + } + file_id_to_base_files + .entry(base_file.file_id.clone()) + .or_default() + .push(base_file); Review Comment: The code should filter out uncommitted files when processing metadata table records for v8+ tables. Currently, files are parsed and added to file groups even if they have no completion timestamp (uncommitted files). This could lead to uncommitted files being included in query results when using metadata table acceleration. Consider filtering uncommitted files similar to how it's done in the storage listing path (listing.rs lines 99-104, 117-122) by checking if `should_filter_uncommitted()` is true and the file's completion_timestamp is None. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
