Copilot commented on code in PR #501:
URL: https://github.com/apache/hudi-rs/pull/501#discussion_r2647460913


##########
crates/core/src/file_group/mod.rs:
##########
@@ -170,26 +177,53 @@ impl FileGroup {
 
     /// Add a [LogFile] to the corresponding [FileSlice] in the [FileGroup].
     ///
-    /// For table version 6 (pre-1.0 spec):
-    ///   Log file's timestamp matches the base file's commit timestamp 
exactly.
+    /// File slices are keyed by `commit_timestamp` (request/base instant 
time). 
+    /// The log file association logic:
     ///
-    /// For table version 8+ (1.0 spec):
-    ///   Log file's timestamp is its own write instant, which may differ from 
the base file's
-    ///   timestamp. We find the FileSlice with the closest timestamp <= log 
file's timestamp.
+    /// - **With completion_timestamp (v8+ tables)**: Find the file slice with 
the largest
+    ///   `commit_timestamp` (base instant time) that is <= log's 
`completion_timestamp`.
+    ///
+    /// - **Without completion_timestamp (v6 tables)**: Use exact matching or 
range lookup
+    ///   based on log timestamp.
     ///
     /// TODO: support adding log files to file group without base files.
     pub fn add_log_file(&mut self, log_file: LogFile) -> Result<&Self> {
+        // If log file has completion_timestamp, use completion-time-based 
association
+        // File slices are keyed by commit_timestamp (base instant time)
+        // Find the largest base instant time <= log's completion time
+        if let Some(log_completion_time) = &log_file.completion_timestamp {
+            // Find file slice with largest base instant time
+            // (commit_timestamp) <= log's completion time
+            if let Some((_, file_slice)) = self
+                .file_slices
+                .range_mut(..=log_completion_time.clone())
+                .next_back()
+            {
+                file_slice.log_files.insert(log_file);
+                return Ok(self);
+            }
+
+            // No file slice with base instant time <= log's completion time 
found.
+            // This means the log file completed before any base file's 
request time.
+            // TODO: Support log files without base files in a future priority 
task.
+            return Err(CoreError::FileGroup(format!(
+                "No suitable FileSlice found for log file with 
completion_timestamp {} in File Group {}. \
+                Log file completed before any base file's request time.",

Review Comment:
   In the log file association logic, when a log file has a 
completion_timestamp but no suitable FileSlice is found (lines 206-213), the 
error message states "Log file completed before any base file's request time." 
However, this could also occur if there are simply no file slices at all in the 
FileGroup. Consider updating the error message to be more specific: "No 
suitable FileSlice found for log file with completion_timestamp {timestamp} in 
File Group {file_id}. Either the log file completed before any base file's 
request time, or the FileGroup has no base files." This provides more context 
for debugging.
   ```suggestion
               // This may mean the log file completed before any base file's 
request time,
               // or that the FileGroup has no base files.
               return Err(CoreError::FileGroup(format!(
                   "No suitable FileSlice found for log file with 
completion_timestamp {} in File Group {}. \
                   Either the log file completed before any base file's request 
time, or the FileGroup has no base files.",
   ```



##########
crates/core/src/timeline/completion_time.rs:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Completion time query view for looking up completion timestamps from 
request timestamps.
+//!
+//! This module provides the [`TimelineViewByCompletionTime`] trait and 
implementations
+//! for mapping request timestamps to completion timestamps. This is essential 
for
+//! v8+ tables where file names contain request timestamps but file ordering 
and
+//! association must be based on completion timestamps.
+//!
+//! # Table Version Differences
+//!
+//! - **v6 tables**: Request timestamp equals completion timestamp. Use 
[`V6CompletionTimeView`]
+//!   which simply returns the request timestamp as the completion timestamp.

Review Comment:
   The module documentation at line 29 states that V6CompletionTimeView "simply 
returns the request timestamp as the completion timestamp." However, the actual 
implementation returns `None` from `get_completion_time`. This is inconsistent 
with the documentation. Either update the documentation to say "returns None to 
indicate completion time is not tracked" or update the implementation to return 
the request timestamp. Given the context and usage patterns, returning `None` 
appears to be the correct behavior, so the documentation should be corrected.
   ```suggestion
   //! - **v6 tables**: Completion time is not tracked. Use 
[`V6CompletionTimeView`],
   //!   which returns `None` from 
[`TimelineViewByCompletionTime::get_completion_time`]
   //!   to indicate that the completion timestamp is not available.
   ```



##########
crates/core/src/timeline/completion_time.rs:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Completion time query view for looking up completion timestamps from 
request timestamps.
+//!
+//! This module provides the [`TimelineViewByCompletionTime`] trait and 
implementations
+//! for mapping request timestamps to completion timestamps. This is essential 
for
+//! v8+ tables where file names contain request timestamps but file ordering 
and
+//! association must be based on completion timestamps.
+//!
+//! # Table Version Differences
+//!
+//! - **v6 tables**: Request timestamp equals completion timestamp. Use 
[`V6CompletionTimeView`]
+//!   which simply returns the request timestamp as the completion timestamp.
+//!
+//! - **v8+ tables**: Request and completion timestamps are different. The 
completion timestamp
+//!   is stored in the timeline instant filename as 
`{request}_{completion}.{action}`.
+//!   Use [`CompletionTimeView`] to look up completion timestamps from a map.
+
+use crate::timeline::instant::Instant;
+use std::collections::HashMap;
+
+/// A view for querying completion timestamps from request timestamps.
+///
+/// This trait abstracts the completion time lookup logic to support both
+/// v6 tables (where request == completion) and v8+ tables (where they differ).
+pub trait TimelineViewByCompletionTime {
+    /// Get the completion timestamp for a given request timestamp.
+    ///
+    /// Returns `Some(completion_timestamp)` if the request timestamp 
corresponds
+    /// to a completed commit, `None` if the commit is pending or unknown.
+    fn get_completion_time(&self, request_timestamp: &str) -> Option<&str>;
+
+    /// Check if a request timestamp corresponds to a completed commit.
+    fn is_completed(&self, request_timestamp: &str) -> bool {
+        self.get_completion_time(request_timestamp).is_some()
+    }
+}
+
+/// Completion time view for v6 tables.
+///
+/// In v6, completion time is not tracked. Acting as no-op.
+#[derive(Debug, Default)]
+pub struct V6CompletionTimeView;
+
+impl V6CompletionTimeView {
+    pub fn new() -> Self {
+        Self
+    }
+}
+
+impl TimelineViewByCompletionTime for V6CompletionTimeView {
+    fn get_completion_time(&self, _request_timestamp: &str) -> Option<&str> {
+        None
+    }
+
+    fn is_completed(&self, _request_timestamp: &str) -> bool {
+        true
+    }
+}
+
+/// Completion time view for v8+ tables backed by a HashMap.
+///
+/// This view is built from timeline instants and maps request timestamps
+/// to their corresponding completion timestamps.
+#[derive(Debug)]
+pub struct CompletionTimeView {
+    /// Map from request timestamp to completion timestamp
+    request_to_completion: HashMap<String, String>,
+}
+
+impl CompletionTimeView {
+    /// Create a new completion time view from timeline instants.
+    ///
+    /// Only completed instants with a completion timestamp are included.
+    pub fn from_instants<'a, I>(instants: I) -> Self
+    where
+        I: IntoIterator<Item = &'a Instant>,
+    {
+        let request_to_completion = instants
+            .into_iter()
+            .filter_map(|instant| {
+                instant.completed_timestamp.as_ref().map(|completion_ts| {
+                    (instant.timestamp.clone(), completion_ts.clone())
+                })
+            })
+            .collect();
+
+        Self {
+            request_to_completion,
+        }
+    }
+
+    /// Create a new empty completion time view.
+    pub fn empty() -> Self {
+        Self {
+            request_to_completion: HashMap::new(),
+        }
+    }
+
+    /// Check if this view has any completion time mappings.
+    pub fn is_empty(&self) -> bool {
+        self.request_to_completion.is_empty()
+    }
+
+    /// Get the number of completion time mappings.
+    pub fn len(&self) -> usize {
+        self.request_to_completion.len()
+    }

Review Comment:
   The `len()` method is provided without a corresponding implementation of the 
`is_empty()` check using `len() == 0`. While `is_empty()` is implemented 
directly, this creates a potential inconsistency. According to Rust best 
practices (Clippy's `len_without_is_empty` lint), when providing `len()`, the 
`is_empty()` implementation should typically delegate to `len() == 0` for 
consistency, or vice versa. Consider implementing one in terms of the other to 
maintain consistency.



##########
crates/core/src/table/mod.rs:
##########
@@ -533,18 +551,76 @@ impl Table {
         timestamp: &str,
         filters: &[Filter],
     ) -> Result<Vec<FileSlice>> {
+        // Create completion time view for setting completion timestamps on 
files
+        let completion_time_view = self.timeline.create_completion_time_view();
+
+        // File slices are keyed by commit_timestamp (request/base instant 
time).
         let excludes = self
             .timeline
             .get_replaced_file_groups_as_of(timestamp)
             .await?;
         let partition_schema = self.get_partition_schema().await?;
         let partition_pruner =
             PartitionPruner::new(filters, &partition_schema, 
self.hudi_configs.as_ref())?;
+
+        // Try MDT-accelerated listing if files partition is configured
+        let mdt_records = if self.is_metadata_table_enabled() {
+            match self.fetch_mdt_records_if_valid(timestamp).await {
+                Ok(records) => Some(records),
+                Err(e) => {
+                    // Fall through to storage listing if MDT fails
+                    log::warn!(
+                        "Failed to read file slices from metadata table, 
falling back to storage listing: {}",
+                        e
+                    );
+                    None
+                }
+            }
+        } else {
+            None
+        };

Review Comment:
   The variable `mdt_records` is set to `None` when 
`is_metadata_table_enabled()` returns false. However, the pattern `if 
self.is_metadata_table_enabled()` could be simplified by moving the entire MDT 
logic into the condition, avoiding the need for the intermediate `mdt_records` 
variable and the subsequent `if let Some(records) = mdt_records` pattern. This 
would improve code readability and reduce nesting.



##########
crates/core/src/table/listing.rs:
##########
@@ -181,6 +222,105 @@ impl FileLister {
     }
 }
 
+/// Build FileGroups from MDT FilesPartitionRecords.
+///
+/// Parses file names from MDT records and constructs FileGroup objects.
+/// Only includes active (non-deleted) files.
+///
+/// # Arguments
+/// * `records` - MDT files partition records (partition_path -> 
FilesPartitionRecord)
+/// * `base_file_extension` - The base file format extension (e.g., "parquet", 
"hfile")
+/// * `completion_time_view` - Optional view to look up completion timestamps.
+///   - For v8+ tables: Pass a view to set completion timestamps on files
+///   - For v6 tables: Pass None (v6 does not track completion timestamps)
+///
+/// # Returns
+/// A map of partition paths to their FileGroups.
+pub fn build_file_groups_from_mdt_records<V: TimelineViewByCompletionTime>(
+    records: &HashMap<String, FilesPartitionRecord>,
+    base_file_extension: &str,
+    completion_time_view: Option<&V>,
+) -> Result<DashMap<String, Vec<FileGroup>>> {
+    let file_groups_map = DashMap::new();
+    let base_file_suffix = format!(".{}", base_file_extension);
+
+    for (partition_path, record) in records {
+        // Skip __all_partitions__ record - it lists partition names, not files
+        if record.is_all_partitions() {
+            continue;
+        }
+
+        let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
+        let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> = 
HashMap::new();
+
+        for file_name in record.active_file_names() {
+            if file_name.starts_with('.') {
+                // Log file: starts with '.'
+                match LogFile::from_str(file_name) {
+                    Ok(mut log_file) => {
+                        // Look up completion timestamp from the view if 
provided
+                        if let Some(view) = completion_time_view {
+                            log_file.completion_timestamp = view
+                                .get_completion_time(&log_file.timestamp)
+                                .map(|s| s.to_string());
+                        }
+                        file_id_to_log_files
+                            .entry(log_file.file_id.clone())
+                            .or_default()
+                            .push(log_file);
+                    }
+                    Err(e) => {
+                        // Skip files that can't be parsed (e.g., .cdc files 
not yet supported)
+                        log::warn!("Failed to parse log file from MDT: {}: 
{}", file_name, e);
+                    }
+                }
+            } else if file_name.ends_with(&base_file_suffix) {
+                // Base file: ends with base file extension
+                match BaseFile::from_str(file_name) {
+                    Ok(mut base_file) => {
+                        // Look up completion timestamp from the view if 
provided
+                        if let Some(view) = completion_time_view {
+                            base_file.completion_timestamp = view
+                                
.get_completion_time(&base_file.commit_timestamp)
+                                .map(|s| s.to_string());
+                        }
+                        file_id_to_base_files
+                            .entry(base_file.file_id.clone())
+                            .or_default()
+                            .push(base_file);
+                    }
+                    Err(e) => {
+                        log::warn!("Failed to parse base file from MDT: {}: 
{}", file_name, e);
+                    }
+                }
+            }
+            // Skip files with unrecognized extensions
+        }
+
+        // Build FileGroups from parsed files
+        // Note: Currently only supports file groups with base files.
+        // TODO: Support file groups with only log files (P1 task)
+        let mut file_groups = Vec::new();
+        for (file_id, base_files) in file_id_to_base_files {
+            let mut fg = FileGroup::new(file_id.clone(), 
partition_path.clone());
+            fg.add_base_files(base_files)?;
+
+            // Add corresponding log files if any
+            if let Some(log_files) = file_id_to_log_files.remove(&file_id) {
+                fg.add_log_files(log_files)?;
+            }
+
+            file_groups.push(fg);
+        }
+
+        if !file_groups.is_empty() {
+            file_groups_map.insert(partition_path.clone(), file_groups);
+        }
+    }

Review Comment:
   The function `build_file_groups_from_mdt_records` uses a sequential `for` 
loop over partition records. For tables with many partitions and files, this 
could become a performance bottleneck. Consider using parallel processing 
(e.g., with `rayon` or similar) to process partitions concurrently, especially 
since each partition's processing is independent. This would be particularly 
beneficial for large tables with thousands of partitions.



##########
crates/core/src/table/listing.rs:
##########
@@ -181,6 +222,105 @@ impl FileLister {
     }
 }
 
+/// Build FileGroups from MDT FilesPartitionRecords.
+///
+/// Parses file names from MDT records and constructs FileGroup objects.
+/// Only includes active (non-deleted) files.
+///
+/// # Arguments
+/// * `records` - MDT files partition records (partition_path -> 
FilesPartitionRecord)
+/// * `base_file_extension` - The base file format extension (e.g., "parquet", 
"hfile")
+/// * `completion_time_view` - Optional view to look up completion timestamps.
+///   - For v8+ tables: Pass a view to set completion timestamps on files
+///   - For v6 tables: Pass None (v6 does not track completion timestamps)
+///
+/// # Returns
+/// A map of partition paths to their FileGroups.
+pub fn build_file_groups_from_mdt_records<V: TimelineViewByCompletionTime>(
+    records: &HashMap<String, FilesPartitionRecord>,
+    base_file_extension: &str,
+    completion_time_view: Option<&V>,
+) -> Result<DashMap<String, Vec<FileGroup>>> {
+    let file_groups_map = DashMap::new();
+    let base_file_suffix = format!(".{}", base_file_extension);
+
+    for (partition_path, record) in records {
+        // Skip __all_partitions__ record - it lists partition names, not files
+        if record.is_all_partitions() {
+            continue;
+        }
+
+        let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
+        let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> = 
HashMap::new();
+

Review Comment:
   The function documentation for `build_file_groups_from_mdt_records` states 
"Only includes active (non-deleted) files." However, reviewing the 
implementation, the function calls `record.active_file_names()` without 
additional filtering. While this may be correct if `active_file_names()` 
already filters deleted files, it would be helpful to add a comment in the code 
clarifying that the filtering is delegated to the `active_file_names()` method 
to make the implementation intention clear.
   ```suggestion
   
           // Note: active_file_names() is expected to return only active 
(non-deleted) files.
   ```



##########
crates/core/src/table/mod.rs:
##########
@@ -237,6 +239,22 @@ impl Table {
             .into()
     }
 
+    /// Check if the metadata table "files" partition is enabled for file 
listing.
+    ///
+    /// Returns `true` if:
+    /// 1. `hoodie.metadata.enable` is true
+    /// 2. Table version is >= 8 (MDT support is only for v8+ tables)
+    /// 3. "files" is in the configured `hoodie.table.metadata.partitions`
+    pub fn is_metadata_table_enabled(&self) -> bool {
+        let metadata_enabled: bool = 
self.hudi_configs.get_or_default(MetadataTableEnabled).into();
+        let table_version: isize = 
self.hudi_configs.get(TableVersion).unwrap().into();

Review Comment:
   The function `is_metadata_table_enabled` calls `unwrap()` on the result of 
`get(TableVersion)`. If the `TableVersion` configuration is not set, this will 
panic. Consider using `get_or_default` or providing a more graceful error 
handling mechanism.
   ```suggestion
           let table_version: isize = 
self.hudi_configs.get_or_default(TableVersion).into();
   ```



##########
crates/core/src/table/mod.rs:
##########
@@ -237,6 +239,22 @@ impl Table {
             .into()
     }
 
+    /// Check if the metadata table "files" partition is enabled for file 
listing.
+    ///
+    /// Returns `true` if:
+    /// 1. `hoodie.metadata.enable` is true
+    /// 2. Table version is >= 8 (MDT support is only for v8+ tables)

Review Comment:
   The documentation states "MDT support is only for v8+ tables" but the code 
checks for `table_version >= 8`. This means version 8 and above are supported. 
However, it's unclear if table version 7 exists or if there's a gap between v6 
and v8. Consider clarifying in the documentation whether version 7 exists and 
why the check is `>= 8` rather than `> 6`.
   ```suggestion
       /// 2. Table version is >= 8 (versions < 8 are treated as not supporting 
MDT; 8 is the first MDT-compatible version)
   ```



##########
crates/core/src/table/listing.rs:
##########
@@ -181,6 +222,105 @@ impl FileLister {
     }
 }
 
+/// Build FileGroups from MDT FilesPartitionRecords.
+///
+/// Parses file names from MDT records and constructs FileGroup objects.
+/// Only includes active (non-deleted) files.
+///
+/// # Arguments
+/// * `records` - MDT files partition records (partition_path -> 
FilesPartitionRecord)
+/// * `base_file_extension` - The base file format extension (e.g., "parquet", 
"hfile")
+/// * `completion_time_view` - Optional view to look up completion timestamps.
+///   - For v8+ tables: Pass a view to set completion timestamps on files
+///   - For v6 tables: Pass None (v6 does not track completion timestamps)
+///
+/// # Returns
+/// A map of partition paths to their FileGroups.
+pub fn build_file_groups_from_mdt_records<V: TimelineViewByCompletionTime>(
+    records: &HashMap<String, FilesPartitionRecord>,
+    base_file_extension: &str,
+    completion_time_view: Option<&V>,
+) -> Result<DashMap<String, Vec<FileGroup>>> {
+    let file_groups_map = DashMap::new();
+    let base_file_suffix = format!(".{}", base_file_extension);
+
+    for (partition_path, record) in records {
+        // Skip __all_partitions__ record - it lists partition names, not files
+        if record.is_all_partitions() {
+            continue;
+        }
+
+        let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
+        let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> = 
HashMap::new();
+
+        for file_name in record.active_file_names() {
+            if file_name.starts_with('.') {
+                // Log file: starts with '.'
+                match LogFile::from_str(file_name) {
+                    Ok(mut log_file) => {
+                        // Look up completion timestamp from the view if 
provided
+                        if let Some(view) = completion_time_view {
+                            log_file.completion_timestamp = view
+                                .get_completion_time(&log_file.timestamp)
+                                .map(|s| s.to_string());
+                        }
+                        file_id_to_log_files
+                            .entry(log_file.file_id.clone())
+                            .or_default()
+                            .push(log_file);
+                    }
+                    Err(e) => {
+                        // Skip files that can't be parsed (e.g., .cdc files 
not yet supported)
+                        log::warn!("Failed to parse log file from MDT: {}: 
{}", file_name, e);
+                    }
+                }
+            } else if file_name.ends_with(&base_file_suffix) {
+                // Base file: ends with base file extension
+                match BaseFile::from_str(file_name) {
+                    Ok(mut base_file) => {
+                        // Look up completion timestamp from the view if 
provided
+                        if let Some(view) = completion_time_view {
+                            base_file.completion_timestamp = view
+                                
.get_completion_time(&base_file.commit_timestamp)
+                                .map(|s| s.to_string());
+                        }
+                        file_id_to_base_files
+                            .entry(base_file.file_id.clone())
+                            .or_default()
+                            .push(base_file);
+                    }
+                    Err(e) => {
+                        log::warn!("Failed to parse base file from MDT: {}: 
{}", file_name, e);
+                    }
+                }
+            }
+            // Skip files with unrecognized extensions
+        }
+
+        // Build FileGroups from parsed files
+        // Note: Currently only supports file groups with base files.
+        // TODO: Support file groups with only log files (P1 task)
+        let mut file_groups = Vec::new();
+        for (file_id, base_files) in file_id_to_base_files {
+            let mut fg = FileGroup::new(file_id.clone(), 
partition_path.clone());
+            fg.add_base_files(base_files)?;
+
+            // Add corresponding log files if any
+            if let Some(log_files) = file_id_to_log_files.remove(&file_id) {
+                fg.add_log_files(log_files)?;
+            }
+
+            file_groups.push(fg);
+        }
+

Review Comment:
   The TODO comment mentions "Support file groups with only log files (P1 
task)" at line 302. However, the current implementation silently skips file 
groups that only have log files (since they're only added when there's a 
corresponding base file). This could lead to data being silently missing from 
query results. Consider either implementing support for log-only file groups 
now, or at minimum logging a warning when log files are skipped due to missing 
base files, so users are aware of the limitation.
   ```suggestion
   
           // Any remaining entries in file_id_to_log_files correspond to file 
groups
           // that have only log files and no base files. These are currently 
not
           // supported, so we skip them but emit a warning to make this 
visible.
           if !file_id_to_log_files.is_empty() {
               let log_only_ids: Vec<String> = 
file_id_to_log_files.keys().cloned().collect();
               log::warn!(
                   "Skipping {} file group(s) with only log files for partition 
'{}': file_ids={:?}. \
                    Log-only file groups are not yet supported.",
                   log_only_ids.len(),
                   partition_path,
                   log_only_ids
               );
           }
   ```



##########
crates/core/src/table/listing.rs:
##########
@@ -181,6 +222,105 @@ impl FileLister {
     }
 }
 
+/// Build FileGroups from MDT FilesPartitionRecords.
+///
+/// Parses file names from MDT records and constructs FileGroup objects.
+/// Only includes active (non-deleted) files.
+///
+/// # Arguments
+/// * `records` - MDT files partition records (partition_path -> 
FilesPartitionRecord)
+/// * `base_file_extension` - The base file format extension (e.g., "parquet", 
"hfile")
+/// * `completion_time_view` - Optional view to look up completion timestamps.
+///   - For v8+ tables: Pass a view to set completion timestamps on files
+///   - For v6 tables: Pass None (v6 does not track completion timestamps)
+///
+/// # Returns
+/// A map of partition paths to their FileGroups.
+pub fn build_file_groups_from_mdt_records<V: TimelineViewByCompletionTime>(
+    records: &HashMap<String, FilesPartitionRecord>,
+    base_file_extension: &str,
+    completion_time_view: Option<&V>,
+) -> Result<DashMap<String, Vec<FileGroup>>> {
+    let file_groups_map = DashMap::new();
+    let base_file_suffix = format!(".{}", base_file_extension);
+
+    for (partition_path, record) in records {
+        // Skip __all_partitions__ record - it lists partition names, not files
+        if record.is_all_partitions() {
+            continue;
+        }
+
+        let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
+        let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> = 
HashMap::new();
+
+        for file_name in record.active_file_names() {
+            if file_name.starts_with('.') {
+                // Log file: starts with '.'
+                match LogFile::from_str(file_name) {
+                    Ok(mut log_file) => {
+                        // Look up completion timestamp from the view if 
provided
+                        if let Some(view) = completion_time_view {
+                            log_file.completion_timestamp = view
+                                .get_completion_time(&log_file.timestamp)
+                                .map(|s| s.to_string());
+                        }
+                        file_id_to_log_files
+                            .entry(log_file.file_id.clone())
+                            .or_default()
+                            .push(log_file);
+                    }
+                    Err(e) => {
+                        // Skip files that can't be parsed (e.g., .cdc files 
not yet supported)
+                        log::warn!("Failed to parse log file from MDT: {}: 
{}", file_name, e);
+                    }
+                }
+            } else if file_name.ends_with(&base_file_suffix) {
+                // Base file: ends with base file extension
+                match BaseFile::from_str(file_name) {
+                    Ok(mut base_file) => {
+                        // Look up completion timestamp from the view if 
provided
+                        if let Some(view) = completion_time_view {
+                            base_file.completion_timestamp = view
+                                
.get_completion_time(&base_file.commit_timestamp)
+                                .map(|s| s.to_string());
+                        }
+                        file_id_to_base_files
+                            .entry(base_file.file_id.clone())
+                            .or_default()
+                            .push(base_file);
+                    }
+                    Err(e) => {
+                        log::warn!("Failed to parse base file from MDT: {}: 
{}", file_name, e);
+                    }
+                }
+            }
+            // Skip files with unrecognized extensions
+        }
+
+        // Build FileGroups from parsed files
+        // Note: Currently only supports file groups with base files.
+        // TODO: Support file groups with only log files (P1 task)
+        let mut file_groups = Vec::new();
+        for (file_id, base_files) in file_id_to_base_files {
+            let mut fg = FileGroup::new(file_id.clone(), 
partition_path.clone());
+            fg.add_base_files(base_files)?;
+
+            // Add corresponding log files if any
+            if let Some(log_files) = file_id_to_log_files.remove(&file_id) {
+                fg.add_log_files(log_files)?;
+            }
+
+            file_groups.push(fg);
+        }
+
+        if !file_groups.is_empty() {
+            file_groups_map.insert(partition_path.clone(), file_groups);
+        }
+    }

Review Comment:
   Log files parsed from MDT records that don't have corresponding base files 
(same file_id) are silently dropped. After iterating through base_files and 
removing matching log files from `file_id_to_log_files`, any remaining entries 
in `file_id_to_log_files` are never processed. This could lead to data loss if 
log-only file groups exist. Consider logging a warning for any remaining log 
files after all file groups have been built, so users are aware that some log 
files were skipped.



##########
crates/core/src/table/mod.rs:
##########
@@ -533,18 +551,76 @@ impl Table {
         timestamp: &str,
         filters: &[Filter],
     ) -> Result<Vec<FileSlice>> {
+        // Create completion time view for setting completion timestamps on 
files
+        let completion_time_view = self.timeline.create_completion_time_view();
+
+        // File slices are keyed by commit_timestamp (request/base instant 
time).
         let excludes = self
             .timeline
             .get_replaced_file_groups_as_of(timestamp)
             .await?;
         let partition_schema = self.get_partition_schema().await?;
         let partition_pruner =
             PartitionPruner::new(filters, &partition_schema, 
self.hudi_configs.as_ref())?;
+
+        // Try MDT-accelerated listing if files partition is configured
+        let mdt_records = if self.is_metadata_table_enabled() {
+            match self.fetch_mdt_records_if_valid(timestamp).await {
+                Ok(records) => Some(records),
+                Err(e) => {
+                    // Fall through to storage listing if MDT fails
+                    log::warn!(
+                        "Failed to read file slices from metadata table, 
falling back to storage listing: {}",
+                        e
+                    );
+                    None
+                }
+            }
+        } else {
+            None
+        };
+
+        // Use the single entrypoint that handles both MDT and storage listing
         self.file_system_view
-            .get_file_slices_as_of(timestamp, &partition_pruner, &excludes)
+            .get_file_slices_as_of(
+                timestamp,
+                &partition_pruner,
+                &excludes,
+                mdt_records.as_ref(),
+                Some(&completion_time_view),
+            )
             .await
     }
 
+    /// Fetch MDT records if the MDT is up-to-date for the query timestamp.
+    ///
+    /// Returns an error if the MDT is behind the query timestamp, indicating
+    /// that the caller should fall back to storage listing.
+    async fn fetch_mdt_records_if_valid(
+        &self,
+        timestamp: &str,
+    ) -> Result<HashMap<String, FilesPartitionRecord>> {
+        let mdt = self.new_metadata_table().await?;
+
+        // Check if MDT is up-to-date enough for the query timestamp
+        // If the MDT's latest commit is before the query timestamp, the MDT 
might
+        // be missing files from later commits, so we should fall back to 
storage.
+        let mdt_latest_ts = 
mdt.timeline.get_latest_commit_timestamp_as_option();
+        match mdt_latest_ts {
+            Some(mdt_ts) if mdt_ts >= timestamp => {
+                // MDT is up-to-date, proceed with MDT listing
+            }
+            _ => {
+                return Err(CoreError::MetadataTable(format!(
+                    "MDT latest timestamp {:?} is behind query timestamp {}, 
falling back to storage listing",
+                    mdt_latest_ts, timestamp

Review Comment:
   The error message format is inconsistent. This error uses a format string 
with `{:?}` for `mdt_latest_ts` (which is an Option) and `{}` for `timestamp`. 
Consider using consistent formatting, and perhaps unwrapping the Option to 
display just the timestamp value rather than the Option wrapper in the error 
message for better readability.
   ```suggestion
           let mdt_latest_ts_display = mdt_latest_ts.unwrap_or("None");
           match mdt_latest_ts {
               Some(mdt_ts) if mdt_ts >= timestamp => {
                   // MDT is up-to-date, proceed with MDT listing
               }
               _ => {
                   return Err(CoreError::MetadataTable(format!(
                       "MDT latest timestamp {} is behind query timestamp {}, 
falling back to storage listing",
                       mdt_latest_ts_display, timestamp
   ```



##########
crates/core/src/file_group/log_file/mod.rs:
##########
@@ -177,12 +199,35 @@ impl PartialOrd for LogFile {
 
 impl Ord for LogFile {
     fn cmp(&self, other: &Self) -> Ordering {
-        // Compare fields in order: timestamp, version, write_token
+        // For ordering, use completion_timestamp when available.
+        // Files with completion_timestamp are considered earlier than those 
without.
+        // If both have completion_timestamp, compare by completion_timestamp.
+        // If both lack completion_timestamp, compare by request timestamp.
         // TODO support `.cdc` suffix
-        self.timestamp
-            .cmp(&other.timestamp)
-            .then(self.version.cmp(&other.version))
-            .then(self.write_token.cmp(&other.write_token))
+        match (&self.completion_timestamp, &other.completion_timestamp) {
+            (Some(ct1), Some(ct2)) => {
+                // Both completed: compare by completion timestamp, then 
version, then write_token
+                ct1.cmp(ct2)
+                    .then(self.version.cmp(&other.version))
+                    .then(self.write_token.cmp(&other.write_token))
+            }
+            (Some(_), None) => {
+                // Self is completed, other is pending: self comes first
+                Ordering::Less
+            }
+            (None, Some(_)) => {
+                // Self is pending, other is completed: other comes first
+                Ordering::Greater
+            }
+            (None, None) => {
+                // Both pending or both v6 (no completion_timestamp set yet):
+                // compare by request timestamp for deterministic ordering
+                self.timestamp
+                    .cmp(&other.timestamp)
+                    .then(self.version.cmp(&other.version))
+                    .then(self.write_token.cmp(&other.write_token))
+            }
+        }
     }

Review Comment:
   The `Ord` implementation for `LogFile` has complex branching logic for 
handling completion timestamps. While the implementation appears correct, the 
ordering semantics could be surprising: completed files always sort before 
uncommitted files, regardless of their request timestamps. This means a log 
file with completion_timestamp="t100" would sort before a log file with 
completion_timestamp=None and timestamp="t50". Consider adding a module-level 
documentation comment explaining this ordering behavior, as it's a critical 
part of the v8+ table semantics and may not be immediately obvious to code 
maintainers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to