Copilot commented on code in PR #501:
URL: https://github.com/apache/hudi-rs/pull/501#discussion_r2648544502


##########
crates/core/src/timeline/completion_time.rs:
##########
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Completion time query view for looking up completion timestamps from 
request timestamps.
+//!
+//! This module provides the [`TimelineViewByCompletionTime`] trait and 
implementations
+//! for mapping request timestamps to completion timestamps. This is essential 
for
+//! v8+ tables where file names contain request timestamps but file ordering 
and
+//! association must be based on completion timestamps.
+//!
+//! # Table Version Differences
+//!
+//! - **v6 tables**: Completion timestamps are not tracked. Use 
[`V6CompletionTimeView`]
+//!   which returns `None` for all lookups (completion_timestamp remains 
unset).
+//!
+//! - **v8+ tables**: Request and completion timestamps are different. The 
completion timestamp
+//!   is stored in the timeline instant filename as 
`{request}_{completion}.{action}`.
+//!   Use [`CompletionTimeView`] to look up completion timestamps from a map.
+
+use crate::timeline::instant::Instant;
+use std::collections::HashMap;
+
+/// A view for querying completion timestamps from request timestamps.
+///
+/// This trait abstracts the completion time lookup logic to support both
+/// v6 tables (where completion time is not tracked) and v8+ tables (where
+/// request and completion timestamps differ).
+pub trait TimelineViewByCompletionTime {
+    /// Get the completion timestamp for a given request timestamp.
+    ///
+    /// Returns `Some(completion_timestamp)` if the request timestamp 
corresponds
+    /// to a completed commit, `None` if the commit is pending/unknown or if
+    /// completion time is not tracked (v6 tables).
+    fn get_completion_time(&self, request_timestamp: &str) -> Option<&str>;
+
+    /// Returns true if uncommitted files should be filtered out.
+    ///
+    /// For v8+ tables, this returns true because we track completion times
+    /// and can distinguish committed from uncommitted files.
+    /// For v6 tables, this returns false because completion time is not 
tracked.
+    fn should_filter_uncommitted(&self) -> bool {
+        false
+    }
+}
+
+/// Completion time view for v6 tables.
+///
+/// In v6, completion time is not tracked separately from request time.
+/// This view always returns `None`, meaning the caller should use the
+/// request timestamp directly for any completion-time-based logic.
+#[derive(Debug, Default)]
+pub struct V6CompletionTimeView;
+
+impl V6CompletionTimeView {
+    pub fn new() -> Self {
+        Self
+    }
+}
+
+impl TimelineViewByCompletionTime for V6CompletionTimeView {
+    fn get_completion_time(&self, _request_timestamp: &str) -> Option<&str> {
+        None
+    }
+}
+
+/// Completion time view for v8+ tables backed by a HashMap.
+///
+/// This view is built from timeline instants and maps request timestamps
+/// to their corresponding completion timestamps.
+#[derive(Debug)]
+pub struct CompletionTimeView {
+    /// Map from request timestamp to completion timestamp
+    request_to_completion: HashMap<String, String>,
+}
+
+impl CompletionTimeView {
+    /// Create a new completion time view from timeline instants.
+    ///
+    /// Only completed instants with a completion timestamp are included.
+    pub fn from_instants<'a, I>(instants: I) -> Self
+    where
+        I: IntoIterator<Item = &'a Instant>,
+    {
+        let request_to_completion = instants
+            .into_iter()
+            .filter_map(|instant| {
+                instant
+                    .completed_timestamp
+                    .as_ref()
+                    .map(|completion_ts| (instant.timestamp.clone(), 
completion_ts.clone()))
+            })
+            .collect();
+
+        Self {
+            request_to_completion,
+        }
+    }
+
+    /// Create a new empty completion time view.
+    pub fn empty() -> Self {
+        Self {
+            request_to_completion: HashMap::new(),
+        }
+    }
+
+    /// Check if this view has any completion time mappings.
+    pub fn is_empty(&self) -> bool {
+        self.request_to_completion.is_empty()
+    }
+
+    /// Get the number of completion time mappings.
+    pub fn len(&self) -> usize {
+        self.request_to_completion.len()
+    }
+}
+
+impl TimelineViewByCompletionTime for CompletionTimeView {
+    fn get_completion_time(&self, request_timestamp: &str) -> Option<&str> {
+        self.request_to_completion
+            .get(request_timestamp)
+            .map(|s| s.as_str())
+    }
+
+    fn should_filter_uncommitted(&self) -> bool {
+        // Only filter if we have completion time data (v8+ table)
+        !self.is_empty()
+    }
+}

Review Comment:
   The test coverage for `should_filter_uncommitted()` is missing. This is a 
critical method that determines whether uncommitted files should be filtered 
out. Consider adding tests that verify:
   1. V6CompletionTimeView returns false (v6 tables don't filter uncommitted 
files)
   2. Empty CompletionTimeView returns false 
   3. Non-empty CompletionTimeView returns true



##########
crates/core/src/table/mod.rs:
##########
@@ -533,18 +559,81 @@ impl Table {
         timestamp: &str,
         filters: &[Filter],
     ) -> Result<Vec<FileSlice>> {
+        // Create completion time view for setting completion timestamps on 
files.
+        // For v8+ tables, this populates completion timestamps and enables 
filtering.
+        // For v6 tables, the view is empty and acts as a no-op.
+        let completion_time_view = self.timeline.create_completion_time_view();
+
+        // File slices are keyed by commit_timestamp (request/base instant 
time).
         let excludes = self
             .timeline
             .get_replaced_file_groups_as_of(timestamp)
             .await?;
         let partition_schema = self.get_partition_schema().await?;
         let partition_pruner =
             PartitionPruner::new(filters, &partition_schema, 
self.hudi_configs.as_ref())?;
+
+        // Try metadata table accelerated listing if files partition is 
configured
+        let metadata_table_records = if self.is_metadata_table_enabled() {
+            match self.fetch_metadata_table_records_if_valid(timestamp).await {
+                Ok(records) => Some(records),
+                Err(e) => {
+                    // Fall through to storage listing if metadata table read 
fails
+                    log::warn!(
+                        "Failed to read file slices from metadata table, 
falling back to storage listing: {}",
+                        e
+                    );
+                    None
+                }
+            }
+        } else {
+            None
+        };
+
+        // Use the single entrypoint that handles both metadata table and 
storage listing
         self.file_system_view
-            .get_file_slices_as_of(timestamp, &partition_pruner, &excludes)
+            .get_file_slices_as_of(
+                timestamp,
+                &partition_pruner,
+                &excludes,
+                metadata_table_records.as_ref(),
+                Some(&completion_time_view),
+            )
             .await
     }
 
+    /// Fetch metadata table records if the metadata table is up-to-date for 
the query timestamp.
+    ///
+    /// Returns an error if the metadata table is behind the query timestamp, 
indicating
+    /// that the caller should fall back to storage listing.
+    async fn fetch_metadata_table_records_if_valid(
+        &self,
+        timestamp: &str,
+    ) -> Result<HashMap<String, FilesPartitionRecord>> {
+        let metadata_table = self.new_metadata_table().await?;
+
+        // Check if metadata table is up-to-date enough for the query 
timestamp.
+        // If the metadata table's latest commit is before the query 
timestamp, it might
+        // be missing files from later commits, so we should fall back to 
storage.

Review Comment:
   The comment on line 616 says "If the metadata table's latest commit is 
before the query timestamp" but the condition on line 622 checks `latest_ts >= 
timestamp`, which means metadata table timestamp is greater than or equal to 
query timestamp. This is correct logic (metadata table should be at least as 
current as the query), but the comment is misleading.
   
   The comment should say "If the metadata table's latest commit is not at 
least as recent as the query timestamp" or "If the query timestamp is after the 
metadata table's latest commit".
   ```suggestion
           // If the metadata table's latest commit is not at least as recent 
as the query
           // timestamp, it might be missing files from later commits, so we 
should fall back to storage.
   ```



##########
crates/core/src/file_group/mod.rs:
##########
@@ -170,28 +175,50 @@ impl FileGroup {
 
     /// Add a [LogFile] to the corresponding [FileSlice] in the [FileGroup].
     ///
-    /// For table version 6 (pre-1.0 spec):
-    ///   Log file's timestamp matches the base file's commit timestamp 
exactly.
+    /// File slices are keyed by `commit_timestamp` (request/base instant 
time).
+    /// The log file association logic:
     ///
-    /// For table version 8+ (1.0 spec):
-    ///   Log file's timestamp is its own write instant, which may differ from 
the base file's
-    ///   timestamp. We find the FileSlice with the closest timestamp <= log 
file's timestamp.
+    /// - **With completion_timestamp (v8+ tables)**: Find the file slice with 
the largest
+    ///   `commit_timestamp` (base instant time) that is <= log's 
`completion_timestamp`.
+    ///
+    /// - **Without completion_timestamp (v6 tables)**: Use exact matching or 
range lookup
+    ///   based on log timestamp.
     ///
     /// TODO: support adding log files to file group without base files.
     pub fn add_log_file(&mut self, log_file: LogFile) -> Result<&Self> {
-        let log_timestamp = log_file.timestamp.as_str();
+        // If log file has completion_timestamp, use completion-time-based 
association
+        // File slices are keyed by commit_timestamp (base instant time)
+        // Find the largest base instant time < log's completion time
+        if let Some(log_completion_time) = &log_file.completion_timestamp {
+            // Find file slice with largest base instant time
+            // (commit_timestamp) < log's completion time

Review Comment:
   The documentation comment on line 182 states to find file slices with 
commit_timestamp "<= log's completion_timestamp", but the comment on line 191 
says "Find the largest base instant time < log's completion time" (note the 
strict less-than).
   
   There's an inconsistency between these two comments. The code implementation 
uses exclusive range `..` (strict less-than), which matches line 191 but 
contradicts line 182. Please clarify the intended behavior and ensure the 
documentation is consistent.



##########
crates/core/src/table/mod.rs:
##########
@@ -237,6 +239,30 @@ impl Table {
             .into()
     }
 
+    /// Check if the metadata table "files" partition is enabled for file 
listing.
+    ///
+    /// Returns `true` if:
+    /// 1. `hoodie.metadata.enable` is true
+    /// 2. Table version is >= 8 (MDT support is only for v8+ tables)
+    /// 3. "files" is in the configured `hoodie.table.metadata.partitions`
+    pub fn is_metadata_table_enabled(&self) -> bool {
+        let metadata_enabled: bool = self
+            .hudi_configs
+            .get_or_default(MetadataTableEnabled)
+            .into();
+        // TODO: remove this table version check when support files for v6 
tables or drop v6 support
+        let table_version: isize = self
+            .hudi_configs
+            .get(TableVersion)
+            .map(|v| v.into())
+            .unwrap_or(0);
+        metadata_enabled
+            && table_version >= 8
+            && self
+                .get_metadata_table_partitions()
+                .contains(&FilesPartitionRecord::PARTITION_NAME.to_string())
+    }

Review Comment:
   The function name `is_metadata_table_enabled` might be misleading. This 
function checks if the metadata table *files partition* is enabled and ready 
for use, not just whether the metadata table feature is enabled. The function 
checks three conditions including table version and partition configuration.
   
   Consider renaming to `is_metadata_files_partition_enabled` or 
`can_use_metadata_table_for_files` to be more specific about what it checks.



##########
crates/core/src/file_group/mod.rs:
##########
@@ -170,28 +175,50 @@ impl FileGroup {
 
     /// Add a [LogFile] to the corresponding [FileSlice] in the [FileGroup].
     ///
-    /// For table version 6 (pre-1.0 spec):
-    ///   Log file's timestamp matches the base file's commit timestamp 
exactly.
+    /// File slices are keyed by `commit_timestamp` (request/base instant 
time).
+    /// The log file association logic:
     ///
-    /// For table version 8+ (1.0 spec):
-    ///   Log file's timestamp is its own write instant, which may differ from 
the base file's
-    ///   timestamp. We find the FileSlice with the closest timestamp <= log 
file's timestamp.
+    /// - **With completion_timestamp (v8+ tables)**: Find the file slice with 
the largest
+    ///   `commit_timestamp` (base instant time) that is <= log's 
`completion_timestamp`.
+    ///
+    /// - **Without completion_timestamp (v6 tables)**: Use exact matching or 
range lookup
+    ///   based on log timestamp.
     ///
     /// TODO: support adding log files to file group without base files.
     pub fn add_log_file(&mut self, log_file: LogFile) -> Result<&Self> {
-        let log_timestamp = log_file.timestamp.as_str();
+        // If log file has completion_timestamp, use completion-time-based 
association
+        // File slices are keyed by commit_timestamp (base instant time)
+        // Find the largest base instant time < log's completion time
+        if let Some(log_completion_time) = &log_file.completion_timestamp {
+            // Find file slice with largest base instant time
+            // (commit_timestamp) < log's completion time
+            if let Some((_, file_slice)) = self
+                .file_slices
+                .range_mut(..log_completion_time.clone())

Review Comment:
   The range query uses `..log_completion_time.clone()` which is exclusive of 
the upper bound. According to the documentation comment on line 182, this 
should find file slices with `commit_timestamp` that is "<= log's 
completion_timestamp", but the exclusive range only includes values strictly 
less than the log's completion time.
   
   This means if a base file's commit_timestamp equals the log file's 
completion_timestamp, they won't be associated, which could be incorrect. 
Consider using an inclusive range `..=` instead of exclusive `..`.
   ```suggestion
           // Find the largest base instant time <= log's completion time
           if let Some(log_completion_time) = &log_file.completion_timestamp {
               // Find file slice with largest base instant time
               // (commit_timestamp) <= log's completion time
               if let Some((_, file_slice)) = self
                   .file_slices
                   .range_mut(..=log_completion_time.clone())
   ```



##########
crates/core/src/file_group/builder.rs:
##########
@@ -113,6 +136,109 @@ pub fn build_replaced_file_groups(
     Ok(file_groups)
 }
 
+/// Build FileGroups from metadata table FilesPartitionRecords.
+///
+/// Parses file names from metadata table records and constructs FileGroup 
objects.
+/// Only includes active (non-deleted) files.
+///
+/// # Arguments
+/// * `records` - Metadata table files partition records (partition_path -> 
FilesPartitionRecord)
+/// * `base_file_extension` - The base file format extension (e.g., "parquet", 
"hfile")
+/// * `completion_time_view` - Optional view to look up completion timestamps.
+///   - For v8+ tables: Pass a view to set completion timestamps on files
+///   - For v6 tables: Pass None (v6 does not track completion timestamps)
+///
+/// # Returns
+/// A map of partition paths to their FileGroups.
+pub fn file_groups_from_files_partition_records<V: 
TimelineViewByCompletionTime>(
+    records: &HashMap<String, FilesPartitionRecord>,
+    base_file_extension: &str,
+    completion_time_view: Option<&V>,
+) -> Result<DashMap<String, Vec<FileGroup>>> {
+    let file_groups_map = DashMap::new();
+    let base_file_suffix = format!(".{}", base_file_extension);
+
+    for (partition_path, record) in records {
+        // Skip __all_partitions__ record - it lists partition names, not files
+        if record.is_all_partitions() {
+            continue;
+        }
+
+        let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
+        let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> = 
HashMap::new();
+
+        for file_name in record.active_file_names() {
+            if file_name.starts_with('.') {
+                // Log file: starts with '.'
+                match LogFile::from_str(file_name) {
+                    Ok(mut log_file) => {
+                        // Look up completion timestamp from the view if 
provided
+                        if let Some(view) = completion_time_view {
+                            log_file.set_completion_time(view);
+                        }
+                        file_id_to_log_files
+                            .entry(log_file.file_id.clone())
+                            .or_default()
+                            .push(log_file);
+                    }
+                    Err(e) => {
+                        // Skip files that can't be parsed (e.g., .cdc files 
not yet supported)
+                        log::warn!(
+                            "Failed to parse log file from metadata table: {}: 
{}",
+                            file_name,
+                            e
+                        );
+                    }
+                }
+            } else if file_name.ends_with(&base_file_suffix) {
+                // Base file: ends with base file extension
+                match BaseFile::from_str(file_name) {
+                    Ok(mut base_file) => {
+                        // Look up completion timestamp from the view if 
provided
+                        if let Some(view) = completion_time_view {
+                            base_file.set_completion_time(view);
+                        }
+                        file_id_to_base_files
+                            .entry(base_file.file_id.clone())
+                            .or_default()
+                            .push(base_file);
+                    }
+                    Err(e) => {
+                        log::warn!(
+                            "Failed to parse base file from metadata table: 
{}: {}",
+                            file_name,
+                            e
+                        );
+                    }
+                }
+            }
+            // Skip files with unrecognized extensions
+        }
+
+        // Build FileGroups from parsed files
+        // Note: Currently only supports file groups with base files.
+        // TODO: Support file groups with only log files (P1 task)
+        let mut file_groups = Vec::new();
+        for (file_id, base_files) in file_id_to_base_files {
+            let mut fg = FileGroup::new(file_id.clone(), 
partition_path.clone());
+            fg.add_base_files(base_files)?;
+
+            // Add corresponding log files if any
+            if let Some(log_files) = file_id_to_log_files.remove(&file_id) {
+                fg.add_log_files(log_files)?;
+            }
+
+            file_groups.push(fg);
+        }
+
+        if !file_groups.is_empty() {
+            file_groups_map.insert(partition_path.clone(), file_groups);
+        }
+    }
+
+    Ok(file_groups_map)
+}

Review Comment:
   The new function `file_groups_from_files_partition_records` (lines 153-240) 
lacks test coverage. This is a critical function that converts metadata table 
records to file groups with completion timestamp handling. Consider adding 
tests that verify:
   1. Parsing base files and log files from metadata table records
   2. Setting completion timestamps correctly via the completion_time_view
   3. Filtering uncommitted files when appropriate
   4. Handling the __all_partitions__ record correctly
   5. Handling parse errors gracefully



##########
crates/core/src/file_group/builder.rs:
##########
@@ -113,6 +136,109 @@ pub fn build_replaced_file_groups(
     Ok(file_groups)
 }
 
+/// Build FileGroups from metadata table FilesPartitionRecords.
+///
+/// Parses file names from metadata table records and constructs FileGroup 
objects.
+/// Only includes active (non-deleted) files.
+///
+/// # Arguments
+/// * `records` - Metadata table files partition records (partition_path -> 
FilesPartitionRecord)
+/// * `base_file_extension` - The base file format extension (e.g., "parquet", 
"hfile")
+/// * `completion_time_view` - Optional view to look up completion timestamps.
+///   - For v8+ tables: Pass a view to set completion timestamps on files
+///   - For v6 tables: Pass None (v6 does not track completion timestamps)
+///
+/// # Returns
+/// A map of partition paths to their FileGroups.
+pub fn file_groups_from_files_partition_records<V: 
TimelineViewByCompletionTime>(
+    records: &HashMap<String, FilesPartitionRecord>,
+    base_file_extension: &str,
+    completion_time_view: Option<&V>,
+) -> Result<DashMap<String, Vec<FileGroup>>> {
+    let file_groups_map = DashMap::new();
+    let base_file_suffix = format!(".{}", base_file_extension);
+
+    for (partition_path, record) in records {
+        // Skip __all_partitions__ record - it lists partition names, not files
+        if record.is_all_partitions() {
+            continue;
+        }
+
+        let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
+        let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> = 
HashMap::new();
+
+        for file_name in record.active_file_names() {
+            if file_name.starts_with('.') {
+                // Log file: starts with '.'
+                match LogFile::from_str(file_name) {
+                    Ok(mut log_file) => {
+                        // Look up completion timestamp from the view if 
provided
+                        if let Some(view) = completion_time_view {
+                            log_file.set_completion_time(view);
+                        }
+                        file_id_to_log_files
+                            .entry(log_file.file_id.clone())
+                            .or_default()
+                            .push(log_file);
+                    }
+                    Err(e) => {
+                        // Skip files that can't be parsed (e.g., .cdc files 
not yet supported)
+                        log::warn!(
+                            "Failed to parse log file from metadata table: {}: 
{}",
+                            file_name,
+                            e
+                        );
+                    }
+                }
+            } else if file_name.ends_with(&base_file_suffix) {
+                // Base file: ends with base file extension
+                match BaseFile::from_str(file_name) {
+                    Ok(mut base_file) => {
+                        // Look up completion timestamp from the view if 
provided
+                        if let Some(view) = completion_time_view {
+                            base_file.set_completion_time(view);
+                        }
+                        file_id_to_base_files
+                            .entry(base_file.file_id.clone())
+                            .or_default()
+                            .push(base_file);

Review Comment:
   The code should filter out uncommitted files when processing metadata table 
records for v8+ tables. Currently, files are parsed and added to file groups 
even if they have no completion timestamp (uncommitted files). This could lead 
to uncommitted files being included in query results when using metadata table 
acceleration.
   
   Consider filtering uncommitted files similar to how it's done in the storage 
listing path (listing.rs lines 99-104, 117-122) by checking if 
`should_filter_uncommitted()` is true and the file's completion_timestamp is 
None.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to