Copilot commented on code in PR #501:
URL: https://github.com/apache/hudi-rs/pull/501#discussion_r2648814724
##########
crates/core/src/file_group/mod.rs:
##########
@@ -96,19 +96,22 @@ impl FileGroup {
Ok(file_group)
}
+ /// Merge another file group into this one.
+ ///
+ /// The file slices are merged by their keys (commit_timestamp / base
instant time).
pub fn merge(&mut self, other: &FileGroup) -> Result<()> {
if self != other {
return Err(CoreError::FileGroup(format!(
"Cannot merge different file groups: {self} and {other}",
)));
}
- for (commit_timestamp, other_file_slice) in other.file_slices.iter() {
- if let Some(existing_file_slice) =
self.file_slices.get_mut(commit_timestamp) {
+ for (key, other_file_slice) in other.file_slices.iter() {
+ if let Some(existing_file_slice) = self.file_slices.get_mut(key) {
existing_file_slice.merge(other_file_slice)?;
} else {
self.file_slices
- .insert(commit_timestamp.clone(),
other_file_slice.clone());
+ .insert(key.clone(), other_file_slice.clone());
Review Comment:
The variable name `key` is ambiguous in the context of file slices. Since
this is the `commit_timestamp` (base instant time / request timestamp),
consider using a more descriptive name like `commit_timestamp_key` or
`file_slice_key` to make the code more maintainable and align with the
documentation that explicitly states "file slices are keyed by
commit_timestamp".
##########
crates/core/src/file_group/builder.rs:
##########
@@ -113,6 +143,112 @@ pub fn build_replaced_file_groups(
Ok(file_groups)
}
+/// Build FileGroups from metadata table FilesPartitionRecords.
+///
+/// This function is used for **snapshot queries** when the metadata table is
enabled.
+/// It parses file names from the `.hoodie/metadata` table's files partition
records
+/// and constructs FileGroup objects. Only includes active (non-deleted) files.
+///
+/// # Arguments
+/// * `records` - Metadata table files partition records (partition_path ->
FilesPartitionRecord)
+/// * `base_file_extension` - The base file format extension (e.g., "parquet",
"hfile")
+/// * `completion_time_view` - View to look up completion timestamps.
+/// - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for all
lookups)
+/// - For v8+ tables: Pass [`CompletionTimeView`] built from timeline
instants
+///
+/// [`V6CompletionTimeView`]:
crate::timeline::completion_time::V6CompletionTimeView
+/// [`CompletionTimeView`]:
crate::timeline::completion_time::CompletionTimeView
+///
+/// # Returns
+/// A map of partition paths to their FileGroups.
+pub fn file_groups_from_files_partition_records<V:
TimelineViewByCompletionTime>(
+ records: &HashMap<String, FilesPartitionRecord>,
+ base_file_extension: &str,
+ completion_time_view: &V,
+) -> Result<DashMap<String, Vec<FileGroup>>> {
+ let file_groups_map = DashMap::new();
+ let base_file_suffix = format!(".{}", base_file_extension);
+
+ for (partition_path, record) in records {
+ // Skip __all_partitions__ record - it lists partition names, not files
+ if record.is_all_partitions() {
+ continue;
+ }
+
+ let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
+ let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> =
HashMap::new();
+
+ for file_name in record.active_file_names() {
+ if file_name.starts_with('.') {
+ // Log file: starts with '.'
+ let mut log_file = LogFile::from_str(file_name).map_err(|e| {
+ CoreError::FileGroup(format!(
+ "Metadata table contains invalid/unsupported log file
name '{}' in partition '{}': {}. \
+ This may indicate data corruption.",
+ file_name, partition_path, e
+ ))
+ })?;
Review Comment:
When building file groups from metadata table records, if a log file has an
invalid name, the function returns an error that stops processing all remaining
files. This means one corrupted log file name will cause the entire metadata
table read to fail. Consider logging the error and skipping the invalid file
instead, similar to how unrecognized file extensions are skipped, to make the
system more resilient to data corruption.
##########
crates/core/src/file_group/mod.rs:
##########
@@ -170,28 +183,58 @@ impl FileGroup {
/// Add a [LogFile] to the corresponding [FileSlice] in the [FileGroup].
///
- /// For table version 6 (pre-1.0 spec):
- /// Log file's timestamp matches the base file's commit timestamp
exactly.
+ /// File slices are keyed by `commit_timestamp` (request/base instant
time).
+ /// The log file association logic:
+ ///
+ /// - **With completion_timestamp (v8+ tables)**: Find the file slice with
the largest
+ /// `commit_timestamp` (base instant time) that is <= log's
`completion_timestamp`.
///
- /// For table version 8+ (1.0 spec):
- /// Log file's timestamp is its own write instant, which may differ from
the base file's
- /// timestamp. We find the FileSlice with the closest timestamp <= log
file's timestamp.
+ /// - **Without completion_timestamp (v6 tables)**: Use exact matching or
range lookup
+ /// based on log timestamp.
///
/// TODO: support adding log files to file group without base files.
pub fn add_log_file(&mut self, log_file: LogFile) -> Result<&Self> {
- let log_timestamp = log_file.timestamp.as_str();
+ // Validate file_id matches
+ if log_file.file_id != self.file_id {
+ return Err(CoreError::FileGroup(format!(
+ "Log file ID '{}' does not match File Group ID '{}'",
+ log_file.file_id, self.file_id
+ )));
+ }
- // Try exact match first (works for both v6 and v8 when timestamps
happen to match)
- if let Some(file_slice) = self.file_slices.get_mut(log_timestamp) {
- file_slice.log_files.insert(log_file);
- return Ok(self);
+ // If log file has completion_timestamp, use completion-time-based
association
+ // File slices are keyed by commit_timestamp (base instant time)
+ // Find the largest base instant time <= log's completion time
+ if let Some(log_completion_time) = &log_file.completion_timestamp {
+ // Find file slice with largest base instant time
+ // (commit_timestamp) <= log's completion time
+ if let Some((_, file_slice)) = self
+ .file_slices
+ .range_mut(..=log_completion_time.clone())
+ .next_back()
+ {
+ file_slice.log_files.insert(log_file);
+ return Ok(self);
+ }
+
+ // No file slice with base instant time <= log's completion time
found.
+ // This means the log file completed before any base file's
request time.
+ // TODO: Support log files without base files in a future priority
task.
+ return Err(CoreError::FileGroup(format!(
+ "No suitable FileSlice found for log file with
completion_timestamp {} in File Group {}. \
+ Log file completed before any base file's request time.",
Review Comment:
The error message states "Log file completed before any base file's request
time" but this might be confusing. The actual condition is that the log's
completion time is less than all base files' commit timestamps (request times).
Consider clarifying: "No suitable FileSlice found: log file's completion
timestamp is earlier than all base files' commit timestamps in this file group."
```suggestion
// This means the log file's completion timestamp is earlier
than all base files' commit timestamps.
// TODO: Support log files without base files in a future
priority task.
return Err(CoreError::FileGroup(format!(
"No suitable FileSlice found: log file's completion
timestamp {} is earlier than all \
base files' commit timestamps in File Group {}.",
```
##########
crates/core/src/file_group/builder.rs:
##########
@@ -113,6 +143,112 @@ pub fn build_replaced_file_groups(
Ok(file_groups)
}
+/// Build FileGroups from metadata table FilesPartitionRecords.
+///
+/// This function is used for **snapshot queries** when the metadata table is
enabled.
+/// It parses file names from the `.hoodie/metadata` table's files partition
records
+/// and constructs FileGroup objects. Only includes active (non-deleted) files.
+///
+/// # Arguments
+/// * `records` - Metadata table files partition records (partition_path ->
FilesPartitionRecord)
+/// * `base_file_extension` - The base file format extension (e.g., "parquet",
"hfile")
+/// * `completion_time_view` - View to look up completion timestamps.
+/// - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for all
lookups)
+/// - For v8+ tables: Pass [`CompletionTimeView`] built from timeline
instants
+///
+/// [`V6CompletionTimeView`]:
crate::timeline::completion_time::V6CompletionTimeView
+/// [`CompletionTimeView`]:
crate::timeline::completion_time::CompletionTimeView
+///
+/// # Returns
+/// A map of partition paths to their FileGroups.
+pub fn file_groups_from_files_partition_records<V:
TimelineViewByCompletionTime>(
+ records: &HashMap<String, FilesPartitionRecord>,
+ base_file_extension: &str,
+ completion_time_view: &V,
+) -> Result<DashMap<String, Vec<FileGroup>>> {
+ let file_groups_map = DashMap::new();
+ let base_file_suffix = format!(".{}", base_file_extension);
+
+ for (partition_path, record) in records {
+ // Skip __all_partitions__ record - it lists partition names, not files
+ if record.is_all_partitions() {
+ continue;
+ }
+
+ let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
+ let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> =
HashMap::new();
+
+ for file_name in record.active_file_names() {
+ if file_name.starts_with('.') {
+ // Log file: starts with '.'
+ let mut log_file = LogFile::from_str(file_name).map_err(|e| {
+ CoreError::FileGroup(format!(
+ "Metadata table contains invalid/unsupported log file
name '{}' in partition '{}': {}. \
+ This may indicate data corruption.",
+ file_name, partition_path, e
+ ))
+ })?;
+
+ log_file.set_completion_time(completion_time_view);
+ // Filter uncommitted files for v8+ tables
+ if completion_time_view.should_filter_uncommitted()
+ && log_file.completion_timestamp.is_none()
+ {
+ continue;
+ }
+ file_id_to_log_files
+ .entry(log_file.file_id.clone())
+ .or_default()
+ .push(log_file);
+ } else if file_name.ends_with(&base_file_suffix) {
+ // Base file: ends with base file extension
+ let mut base_file = BaseFile::from_str(file_name).map_err(|e| {
+ CoreError::FileGroup(format!(
+ "Metadata table contains invalid/unsupported base file
name '{}' in partition '{}': {}. \
+ This may indicate data corruption.",
+ file_name, partition_path, e
+ ))
+ })?;
Review Comment:
Similar to log files, invalid base file names cause the entire metadata
table read to fail. Consider handling this more gracefully by logging and
skipping invalid files rather than failing the entire operation, especially
since the error message suggests "data corruption" but corruption should
ideally be isolated to specific files rather than blocking all reads.
##########
crates/core/src/file_group/log_file/mod.rs:
##########
@@ -32,20 +33,32 @@ pub mod scanner;
/// Represents a Hudi log file (delta log).
///
-/// The `timestamp` field has different semantics depending on table version:
-/// - **Table version 6 (pre-1.0 spec)**: The base commit timestamp that this
log file belongs to.
-/// - **Table version 8+ (1.0 spec)**: The request instant timestamp of the
deltacommit that wrote
-/// this log file.
+/// The `timestamp` field is the timestamp embedded in the log file name:
+/// - For v6 tables: base commit timestamp (matches the base file's commit
timestamp).
+/// - For v8+ tables: request instant timestamp of the deltacommit.
+///
+/// The `completion_timestamp` field is used to determine the correct ordering
and
+/// file slice association:
+/// - For v6 tables: This is always `None` (v6 does not track completion
timestamps).
+/// - For v8+ tables: Set from the timeline when the commit is completed.
+/// If `None`, the commit is still pending and the file should not be
included in queries.
+///
+/// Ordering is based on `completion_timestamp` (if available), falling back
to `timestamp`
+/// for uncommitted files. Completed files are ordered before uncommitted ones.
#[derive(Clone, Debug)]
pub struct LogFile {
pub file_id: String,
/// The timestamp embedded in the log file name.
- ///
- /// For v6 tables: base commit timestamp (matches the base file's commit
timestamp).
- /// For v8+ tables: request instant timestamp of the deltacommit that
wrote this log file.
pub timestamp: String,
+ /// The completion timestamp of the commit that wrote this log file.
+ ///
+ /// For v6 tables: This is always `None` (v6 does not track completion
timestamps).
+ /// For v8+ tables: Set from the timeline; `None` if commit is pending.
+ pub completion_timestamp: Option<String>,
pub extension: String,
- pub version: String,
+ /// Log file version number. Starts at 1 and increments when the log file
rolls over
+ /// (e.g., reaches size limit) within the same delta commit by the same
writer.
+ pub version: u32,
Review Comment:
The `version` field type was changed from `String` to `u32`. This is a
breaking change that improves correctness (enables proper integer ordering).
Ensure that all serialization/deserialization paths handle this correctly,
especially if log file metadata is persisted or transmitted over the network in
a way that expects strings.
##########
crates/core/src/table/mod.rs:
##########
@@ -237,6 +239,44 @@ impl Table {
.into()
}
+ /// Check if the metadata table "files" partition is enabled for file
listing.
+ ///
+ /// Returns `true` if:
+ /// 1. Table version is >= 8 (MDT support is only for v8+ tables), AND
+ /// 2. Either:
+ /// - `hoodie.metadata.enable` is explicitly true, OR
+ /// - "files" is in the configured `hoodie.table.metadata.partitions`
+ /// (implicit enablement for v8+ tables with configured partitions)
+ ///
+ /// This matches Hudi Java behavior where metadata table is considered
active
+ /// when partitions are configured, even without explicit
`hoodie.metadata.enable=true`.
+ pub fn is_metadata_table_enabled(&self) -> bool {
+ // Check table version first - MDT is only supported for v8+ tables
+ let table_version: isize = self
+ .hudi_configs
+ .get(TableVersion)
+ .map(|v| v.into())
+ .unwrap_or(0);
+
+ if table_version < 8 {
+ return false;
+ }
+
+ // Check if "files" partition is configured
+ let has_files_partition = self
+ .get_metadata_table_partitions()
+ .contains(&FilesPartitionRecord::PARTITION_NAME.to_string());
+
+ // Explicit check for hoodie.metadata.enable
+ let metadata_explicitly_enabled: bool = self
+ .hudi_configs
+ .get_or_default(MetadataTableEnabled)
+ .into();
+
+ // Enable if explicitly enabled OR if files partition is configured
(implicit enablement)
+ metadata_explicitly_enabled || has_files_partition
+ }
Review Comment:
The logic checks if table version is less than 8 to disable metadata table,
but then checks if files partition is configured OR if metadata is explicitly
enabled. This creates an asymmetry: v6 tables can never enable MDT (correct),
but for v8+ tables, the check returns true if EITHER explicit enable OR files
partition exists. This means for v8+ tables, having files partition configured
implicitly enables MDT even if `hoodie.metadata.enable=false`. Consider
documenting this override behavior more explicitly or checking the explicit
enable flag first to respect `false` values.
##########
crates/core/src/file_group/mod.rs:
##########
@@ -170,28 +183,58 @@ impl FileGroup {
/// Add a [LogFile] to the corresponding [FileSlice] in the [FileGroup].
///
- /// For table version 6 (pre-1.0 spec):
- /// Log file's timestamp matches the base file's commit timestamp
exactly.
+ /// File slices are keyed by `commit_timestamp` (request/base instant
time).
+ /// The log file association logic:
+ ///
+ /// - **With completion_timestamp (v8+ tables)**: Find the file slice with
the largest
+ /// `commit_timestamp` (base instant time) that is <= log's
`completion_timestamp`.
///
- /// For table version 8+ (1.0 spec):
- /// Log file's timestamp is its own write instant, which may differ from
the base file's
- /// timestamp. We find the FileSlice with the closest timestamp <= log
file's timestamp.
+ /// - **Without completion_timestamp (v6 tables)**: Use exact matching or
range lookup
+ /// based on log timestamp.
///
/// TODO: support adding log files to file group without base files.
pub fn add_log_file(&mut self, log_file: LogFile) -> Result<&Self> {
- let log_timestamp = log_file.timestamp.as_str();
+ // Validate file_id matches
+ if log_file.file_id != self.file_id {
+ return Err(CoreError::FileGroup(format!(
+ "Log file ID '{}' does not match File Group ID '{}'",
+ log_file.file_id, self.file_id
+ )));
+ }
- // Try exact match first (works for both v6 and v8 when timestamps
happen to match)
- if let Some(file_slice) = self.file_slices.get_mut(log_timestamp) {
- file_slice.log_files.insert(log_file);
- return Ok(self);
+ // If log file has completion_timestamp, use completion-time-based
association
+ // File slices are keyed by commit_timestamp (base instant time)
+ // Find the largest base instant time <= log's completion time
+ if let Some(log_completion_time) = &log_file.completion_timestamp {
+ // Find file slice with largest base instant time
+ // (commit_timestamp) <= log's completion time
+ if let Some((_, file_slice)) = self
+ .file_slices
+ .range_mut(..=log_completion_time.clone())
+ .next_back()
Review Comment:
The log file association logic uses
`range_mut(..=log_completion_time.clone())` which requires
`log_completion_time` to be owned (cloned). However, `log_completion_time` is
already a `String` owned by `log_file`. Consider using a reference instead to
avoid the unnecessary clone: change to `&log_completion_time.as_str()` or use
`range_mut(..=log_completion_time.as_str())` if the BTreeMap key type allows it.
##########
crates/core/src/table/fs_view.rs:
##########
@@ -54,20 +58,83 @@ impl FileSystemView {
})
}
- async fn load_file_groups(&self, partition_pruner: &PartitionPruner) ->
Result<()> {
+ /// Load file groups by listing from the file system.
+ ///
+ /// # Arguments
+ /// * `partition_pruner` - Filters which partitions to include
+ /// * `completion_time_view` - View to look up completion timestamps.
+ /// - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for
all lookups)
+ /// - For v8+ tables: Pass [`CompletionTimeView`] built from timeline
instants
+ ///
+ /// [`V6CompletionTimeView`]:
crate::timeline::completion_time::V6CompletionTimeView
+ /// [`CompletionTimeView`]:
crate::timeline::completion_time::CompletionTimeView
+ async fn load_file_groups_from_file_system<V: TimelineViewByCompletionTime
+ Sync>(
+ &self,
+ partition_pruner: &PartitionPruner,
+ completion_time_view: &V,
+ ) -> Result<()> {
let lister = FileLister::new(
self.hudi_configs.clone(),
self.storage.clone(),
partition_pruner.to_owned(),
);
- let file_groups_map =
lister.list_file_groups_for_relevant_partitions().await?;
+ let file_groups_map = lister
+ .list_file_groups_for_relevant_partitions(completion_time_view)
+ .await?;
for (partition_path, file_groups) in file_groups_map {
self.partition_to_file_groups
.insert(partition_path, file_groups);
}
Ok(())
}
+ /// Load file groups from metadata table records.
+ ///
+ /// This is an alternative to `load_file_groups_from_file_system` that
uses pre-fetched
+ /// metadata table records instead of listing from storage. Only
partitions that pass the
+ /// partition pruner will be loaded.
+ ///
+ /// This method is not async because it operates on pre-fetched records -
no I/O is needed.
+ ///
+ /// TODO: Consider making this async and fetching metadata table records
within this method
+ /// instead of receiving pre-fetched records. This would make the API more
symmetric with
+ /// `load_file_groups_from_file_system` and encapsulate the metadata table
fetching logic.
+ ///
+ /// # Arguments
+ /// * `metadata_table_records` - Metadata table files partition records
+ /// * `partition_pruner` - Filters which partitions to include
+ /// * `completion_time_view` - View to look up completion timestamps.
+ /// - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for
all lookups)
+ /// - For v8+ tables: Pass [`CompletionTimeView`] built from timeline
instants
+ ///
+ /// [`V6CompletionTimeView`]:
crate::timeline::completion_time::V6CompletionTimeView
+ /// [`CompletionTimeView`]:
crate::timeline::completion_time::CompletionTimeView
+ fn load_file_groups_from_metadata_table<V: TimelineViewByCompletionTime>(
+ &self,
+ metadata_table_records: &HashMap<String, FilesPartitionRecord>,
+ partition_pruner: &PartitionPruner,
+ completion_time_view: &V,
+ ) -> Result<()> {
+ let base_file_format: String =
self.hudi_configs.get_or_default(BaseFileFormat).into();
+ let file_groups_map = file_groups_from_files_partition_records(
+ metadata_table_records,
+ &base_file_format,
+ completion_time_view,
+ )?;
+
+ for entry in file_groups_map.iter() {
+ let partition_path = entry.key();
+ if partition_pruner.is_empty() ||
partition_pruner.should_include(partition_path) {
+ self.partition_to_file_groups
+ .insert(partition_path.clone(), entry.value().clone());
+ }
+ }
+ Ok(())
+ }
Review Comment:
The `load_file_groups_from_file_system` and
`load_file_groups_from_metadata_table` methods both write to
`partition_to_file_groups` which is a shared `DashMap`. However, there's a
potential issue: if `get_file_slices_as_of` is called concurrently from
multiple threads, both load methods could run simultaneously and race to
populate the map. While DashMap is thread-safe for individual operations, the
methods don't clear existing data before inserting, which could lead to
unexpected behavior if called multiple times. Consider either: (1) clearing the
map at the start of these methods, (2) using a mutex to ensure only one load
operation runs at a time, or (3) documenting that these methods should not be
called concurrently.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]