This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e3a2cf  refactor: add targeted MDT lookup APIs for partition pruning 
(#502)
3e3a2cf is described below

commit 3e3a2cf11bb72b8d12d1271d56f82f7385304d78
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Dec 30 06:09:53 2025 -0600

    refactor: add targeted MDT lookup APIs for partition pruning (#502)
---
 crates/core/src/file_group/base_file.rs            |   4 +-
 crates/core/src/file_group/builder.rs              | 165 +++++---
 crates/core/src/file_group/log_file/content.rs     |   2 +-
 crates/core/src/file_group/log_file/mod.rs         |   4 +-
 crates/core/src/file_group/log_file/scanner.rs     |   6 +-
 crates/core/src/file_group/reader.rs               | 303 ++++----------
 crates/core/src/hfile/reader.rs                    |  14 +-
 crates/core/src/hfile/record.rs                    |  10 +-
 crates/core/src/metadata/merger.rs                 | 102 ++++-
 crates/core/src/metadata/mod.rs                    |   3 +-
 crates/core/src/metadata/table/mod.rs              | 441 +++++++++++++++++++++
 .../metadata/{table_record.rs => table/records.rs} |  73 +++-
 crates/core/src/table/fs_view.rs                   | 205 +++++-----
 crates/core/src/table/listing.rs                   |  22 +-
 crates/core/src/table/mod.rs                       | 430 +-------------------
 crates/core/src/timeline/completion_time.rs        | 202 +---------
 crates/core/src/timeline/mod.rs                    |  68 ++--
 crates/core/src/timeline/view.rs                   | 312 +++++++++++++++
 18 files changed, 1307 insertions(+), 1059 deletions(-)

diff --git a/crates/core/src/file_group/base_file.rs 
b/crates/core/src/file_group/base_file.rs
index 258b837..389afe6 100644
--- a/crates/core/src/file_group/base_file.rs
+++ b/crates/core/src/file_group/base_file.rs
@@ -18,7 +18,7 @@
  */
 use crate::error::CoreError;
 use crate::storage::file_metadata::FileMetadata;
-use crate::timeline::completion_time::TimelineViewByCompletionTime;
+use crate::timeline::completion_time::CompletionTimeView;
 use crate::Result;
 use std::fmt::Display;
 use std::str::FromStr;
@@ -106,7 +106,7 @@ impl BaseFile {
     ///
     /// For v6 tables, the view returns `None` and this is a no-op.
     /// For v8+ tables, this sets the completion timestamp for completed 
commits.
-    pub fn set_completion_time<V: TimelineViewByCompletionTime>(&mut self, 
view: &V) {
+    pub fn set_completion_time<V: CompletionTimeView>(&mut self, view: &V) {
         self.completion_timestamp = view
             .get_completion_time(&self.commit_timestamp)
             .map(|s| s.to_string());
diff --git a/crates/core/src/file_group/builder.rs 
b/crates/core/src/file_group/builder.rs
index 2c3374b..3df044f 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -23,7 +23,7 @@ use crate::file_group::FileGroup;
 use crate::metadata::commit::HoodieCommitMetadata;
 use crate::metadata::replace_commit::HoodieReplaceCommitMetadata;
 use crate::metadata::table_record::FilesPartitionRecord;
-use crate::timeline::completion_time::TimelineViewByCompletionTime;
+use crate::timeline::completion_time::CompletionTimeView;
 use crate::Result;
 use dashmap::DashMap;
 use serde_json::{Map, Value};
@@ -64,12 +64,7 @@ impl FileGroupMerger for HashSet<FileGroup> {
 ///
 /// * `commit_metadata` - The commit metadata JSON map
 /// * `completion_time_view` - View to look up completion timestamps.
-///   - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for all 
lookups)
-///   - For v8+ tables: Pass [`CompletionTimeView`] built from timeline 
instants
-///
-/// [`V6CompletionTimeView`]: 
crate::timeline::completion_time::V6CompletionTimeView
-/// [`CompletionTimeView`]: 
crate::timeline::completion_time::CompletionTimeView
-pub fn file_groups_from_commit_metadata<V: TimelineViewByCompletionTime>(
+pub fn file_groups_from_commit_metadata<V: CompletionTimeView>(
     commit_metadata: &Map<String, Value>,
     completion_time_view: &V,
 ) -> Result<HashSet<FileGroup>> {
@@ -153,15 +148,10 @@ pub fn replaced_file_groups_from_replace_commit(
 /// * `records` - Metadata table files partition records (partition_path -> 
FilesPartitionRecord)
 /// * `base_file_extension` - The base file format extension (e.g., "parquet", 
"hfile")
 /// * `completion_time_view` - View to look up completion timestamps.
-///   - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for all 
lookups)
-///   - For v8+ tables: Pass [`CompletionTimeView`] built from timeline 
instants
-///
-/// [`V6CompletionTimeView`]: 
crate::timeline::completion_time::V6CompletionTimeView
-/// [`CompletionTimeView`]: 
crate::timeline::completion_time::CompletionTimeView
 ///
 /// # Returns
 /// A map of partition paths to their FileGroups.
-pub fn file_groups_from_files_partition_records<V: 
TimelineViewByCompletionTime>(
+pub fn file_groups_from_files_partition_records<V: CompletionTimeView>(
     records: &HashMap<String, FilesPartitionRecord>,
     base_file_extension: &str,
     completion_time_view: &V,
@@ -190,7 +180,7 @@ pub fn file_groups_from_files_partition_records<V: 
TimelineViewByCompletionTime>
                 })?;
 
                 log_file.set_completion_time(completion_time_view);
-                // Filter uncommitted files for v8+ tables
+                // Filter uncommitted files for timeline layout v2
                 if completion_time_view.should_filter_uncommitted()
                     && log_file.completion_timestamp.is_none()
                 {
@@ -211,7 +201,7 @@ pub fn file_groups_from_files_partition_records<V: 
TimelineViewByCompletionTime>
                 })?;
 
                 base_file.set_completion_time(completion_time_view);
-                // Filter uncommitted files for v8+ tables
+                // Filter uncommitted files for timeline layout v2
                 if completion_time_view.should_filter_uncommitted()
                     && base_file.completion_timestamp.is_none()
                 {
@@ -285,8 +275,34 @@ mod tests {
 
     mod test_file_groups_from_commit_metadata {
         use super::super::*;
-        use crate::timeline::completion_time::{CompletionTimeView, 
V6CompletionTimeView};
+        use crate::config::HudiConfigs;
+        use crate::timeline::instant::{Action, Instant, State};
+        use crate::timeline::view::TimelineView;
         use serde_json::{json, Map, Value};
+        use std::collections::HashSet;
+        use std::sync::Arc;
+
+        fn create_layout_v1_view() -> TimelineView {
+            let configs = 
Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "1")]));
+            TimelineView::new(
+                "99999999999999999".to_string(),
+                None,
+                &[] as &[Instant],
+                HashSet::new(),
+                &configs,
+            )
+        }
+
+        fn create_layout_v2_view(instants: &[Instant]) -> TimelineView {
+            let configs = 
Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "2")]));
+            TimelineView::new(
+                "99999999999999999".to_string(),
+                None,
+                instants,
+                HashSet::new(),
+                &configs,
+            )
+        }
 
         #[test]
         fn test_missing_partition_to_write_stats() {
@@ -298,8 +314,8 @@ mod tests {
             .unwrap()
             .clone();
 
-            // Use V6CompletionTimeView for v6 table behavior (no completion 
time tracking)
-            let result = file_groups_from_commit_metadata(&metadata, 
&V6CompletionTimeView::new());
+            // Use layout v1 view (no completion time tracking)
+            let result = file_groups_from_commit_metadata(&metadata, 
&create_layout_v1_view());
             // With the new implementation, this returns Ok with an empty 
HashSet
             // because iter_write_stats() returns an empty iterator when 
partition_to_write_stats is None
             assert!(result.is_ok());
@@ -317,7 +333,7 @@ mod tests {
             .unwrap()
             .clone();
 
-            let result = file_groups_from_commit_metadata(&metadata, 
&V6CompletionTimeView::new());
+            let result = file_groups_from_commit_metadata(&metadata, 
&create_layout_v1_view());
             assert!(matches!(
                 result,
                 Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to 
parse commit metadata")
@@ -337,7 +353,7 @@ mod tests {
             .unwrap()
             .clone();
 
-            let result = file_groups_from_commit_metadata(&metadata, 
&V6CompletionTimeView::new());
+            let result = file_groups_from_commit_metadata(&metadata, 
&create_layout_v1_view());
             assert!(matches!(
                 result,
                 Err(CoreError::CommitMetadata(msg)) if msg == "Missing fileId 
in write stats"
@@ -357,7 +373,7 @@ mod tests {
             .unwrap()
             .clone();
 
-            let result = file_groups_from_commit_metadata(&metadata, 
&V6CompletionTimeView::new());
+            let result = file_groups_from_commit_metadata(&metadata, 
&create_layout_v1_view());
             assert!(matches!(
                 result,
                 Err(CoreError::CommitMetadata(msg)) if msg == "Missing path in 
write stats"
@@ -378,7 +394,7 @@ mod tests {
             .unwrap()
             .clone();
 
-            let result = file_groups_from_commit_metadata(&metadata, 
&V6CompletionTimeView::new());
+            let result = file_groups_from_commit_metadata(&metadata, 
&create_layout_v1_view());
             assert!(matches!(
                 result,
                 Err(CoreError::CommitMetadata(msg)) if msg == "Invalid file 
name in path"
@@ -399,7 +415,7 @@ mod tests {
             .unwrap()
             .clone();
 
-            let result = file_groups_from_commit_metadata(&metadata, 
&V6CompletionTimeView::new());
+            let result = file_groups_from_commit_metadata(&metadata, 
&create_layout_v1_view());
             // Serde will fail to parse this and return a deserialization error
             assert!(matches!(
                 result,
@@ -421,7 +437,7 @@ mod tests {
             .unwrap()
             .clone();
 
-            let result = file_groups_from_commit_metadata(&metadata, 
&V6CompletionTimeView::new());
+            let result = file_groups_from_commit_metadata(&metadata, 
&create_layout_v1_view());
             // Serde will fail to parse this and return a deserialization error
             assert!(matches!(
                 result,
@@ -445,7 +461,7 @@ mod tests {
         }"#;
 
             let metadata: Map<String, Value> = 
serde_json::from_str(sample_json).unwrap();
-            let result = file_groups_from_commit_metadata(&metadata, 
&V6CompletionTimeView::new());
+            let result = file_groups_from_commit_metadata(&metadata, 
&create_layout_v1_view());
             assert!(result.is_ok());
             let file_groups = result.unwrap();
             assert_eq!(file_groups.len(), 2);
@@ -475,7 +491,7 @@ mod tests {
         }"#;
 
             let metadata: Map<String, Value> = 
serde_json::from_str(sample_json).unwrap();
-            let result = file_groups_from_commit_metadata(&metadata, 
&V6CompletionTimeView::new());
+            let result = file_groups_from_commit_metadata(&metadata, 
&create_layout_v1_view());
             assert!(result.is_ok());
             let file_groups = result.unwrap();
             assert_eq!(file_groups.len(), 1);
@@ -507,7 +523,7 @@ mod tests {
         }"#;
 
             let metadata: Map<String, Value> = 
serde_json::from_str(sample_json).unwrap();
-            let result = file_groups_from_commit_metadata(&metadata, 
&V6CompletionTimeView::new());
+            let result = file_groups_from_commit_metadata(&metadata, 
&create_layout_v1_view());
             assert!(result.is_ok());
             let file_groups = result.unwrap();
             assert_eq!(file_groups.len(), 1);
@@ -531,7 +547,7 @@ mod tests {
         }"#;
 
             let metadata: Map<String, Value> = 
serde_json::from_str(sample_json).unwrap();
-            let result = file_groups_from_commit_metadata(&metadata, 
&V6CompletionTimeView::new());
+            let result = file_groups_from_commit_metadata(&metadata, 
&create_layout_v1_view());
             assert!(result.is_ok());
             let file_groups = result.unwrap();
             assert_eq!(file_groups.len(), 1);
@@ -555,7 +571,6 @@ mod tests {
             let metadata: Map<String, Value> = 
serde_json::from_str(sample_json).unwrap();
 
             // Create a completion time view with the mapping for this file's 
request timestamp
-            use crate::timeline::instant::{Action, Instant, State};
             let instants = vec![Instant {
                 timestamp: "20240418173200000".to_string(),
                 completion_timestamp: Some("20240418173210000".to_string()),
@@ -563,7 +578,7 @@ mod tests {
                 state: State::Completed,
                 epoch_millis: 0,
             }];
-            let view = CompletionTimeView::from_instants(&instants);
+            let view = create_layout_v2_view(&instants);
 
             let result = file_groups_from_commit_metadata(&metadata, &view);
             assert!(result.is_ok());
@@ -581,12 +596,36 @@ mod tests {
 
     mod test_file_groups_from_files_partition_records {
         use super::super::*;
+        use crate::config::HudiConfigs;
         use crate::metadata::table_record::{
             FilesPartitionRecord, HoodieMetadataFileInfo, MetadataRecordType,
         };
-        use crate::timeline::completion_time::{CompletionTimeView, 
V6CompletionTimeView};
         use crate::timeline::instant::{Action, Instant, State};
-        use std::collections::HashMap;
+        use crate::timeline::view::TimelineView;
+        use std::collections::{HashMap, HashSet};
+        use std::sync::Arc;
+
+        fn create_layout_v1_view() -> TimelineView {
+            let configs = 
Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "1")]));
+            TimelineView::new(
+                "99999999999999999".to_string(),
+                None,
+                &[] as &[Instant],
+                HashSet::new(),
+                &configs,
+            )
+        }
+
+        fn create_layout_v2_view(instants: &[Instant]) -> TimelineView {
+            let configs = 
Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "2")]));
+            TimelineView::new(
+                "99999999999999999".to_string(),
+                None,
+                instants,
+                HashSet::new(),
+                &configs,
+            )
+        }
 
         fn create_file_info(name: &str, size: i64, is_deleted: bool) -> 
HoodieMetadataFileInfo {
             HoodieMetadataFileInfo::new(name.to_string(), size, is_deleted)
@@ -616,9 +655,9 @@ mod tests {
                 files_map.insert(partition.to_string(), 
create_file_info(partition, 0, false));
             }
             (
-                "__all_partitions__".to_string(),
+                FilesPartitionRecord::ALL_PARTITIONS_KEY.to_string(),
                 FilesPartitionRecord {
-                    key: "__all_partitions__".to_string(),
+                    key: FilesPartitionRecord::ALL_PARTITIONS_KEY.to_string(),
                     record_type: MetadataRecordType::AllPartitions,
                     files: files_map,
                 },
@@ -631,7 +670,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
@@ -647,7 +686,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
@@ -670,7 +709,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
@@ -703,7 +742,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
@@ -739,7 +778,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
@@ -765,7 +804,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
@@ -793,7 +832,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
@@ -819,7 +858,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_err());
             let err = result.unwrap_err();
@@ -845,7 +884,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_err());
             let err = result.unwrap_err();
@@ -855,7 +894,7 @@ mod tests {
         }
 
         #[test]
-        fn test_v8_uncommitted_base_file_filtered() {
+        fn test_layout_v2_uncommitted_base_file_filtered() {
             let mut records = HashMap::new();
             // A base file with timestamp that is NOT in the completion view 
(uncommitted)
             let (key, record) = create_files_record(
@@ -865,7 +904,7 @@ mod tests {
             records.insert(key, record);
 
             // Create a non-empty completion view that does NOT include the 
file's timestamp
-            // This simulates an uncommitted file in v8 tables
+            // This simulates an uncommitted file in timeline layout v2
             // The view has OTHER commits, so should_filter_uncommitted() 
returns true
             let instants = vec![Instant {
                 timestamp: "20240418173199999".to_string(), // Different 
timestamp
@@ -874,7 +913,7 @@ mod tests {
                 state: State::Completed,
                 epoch_millis: 0,
             }];
-            let view = CompletionTimeView::from_instants(&instants);
+            let view = create_layout_v2_view(&instants);
 
             let result = file_groups_from_files_partition_records(&records, 
"parquet", &view);
             assert!(result.is_ok());
@@ -886,10 +925,10 @@ mod tests {
         }
 
         #[test]
-        fn test_empty_completion_view_no_filtering() {
-            // Empty completion view means no completion time data available
-            // In this case, should_filter_uncommitted() returns false
-            // and all files are included (v6 behavior)
+        fn test_empty_completion_view_filters_uncommitted() {
+            // For timeline layout v2 with no completion time data (empty 
instants),
+            // all files are filtered because their request timestamps don't 
have
+            // corresponding completion timestamps in the view.
             let mut records = HashMap::new();
             let (key, record) = create_files_record(
                 "partition1",
@@ -897,18 +936,18 @@ mod tests {
             );
             records.insert(key, record);
 
-            let view = CompletionTimeView::from_instants(&[]);
+            let view = create_layout_v2_view(&[]);
 
             let result = file_groups_from_files_partition_records(&records, 
"parquet", &view);
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
 
-            // With empty view, no filtering happens
-            assert_eq!(file_groups_map.len(), 1);
+            // Layout v2 with empty view filters all files (no completion 
timestamps available)
+            assert!(file_groups_map.is_empty());
         }
 
         #[test]
-        fn test_v8_committed_base_file_included() {
+        fn test_layout_v2_committed_base_file_included() {
             let mut records = HashMap::new();
             let (key, record) = create_files_record(
                 "partition1",
@@ -924,7 +963,7 @@ mod tests {
                 state: State::Completed,
                 epoch_millis: 0,
             }];
-            let view = CompletionTimeView::from_instants(&instants);
+            let view = create_layout_v2_view(&instants);
 
             let result = file_groups_from_files_partition_records(&records, 
"parquet", &view);
             assert!(result.is_ok());
@@ -943,7 +982,7 @@ mod tests {
         }
 
         #[test]
-        fn test_v8_uncommitted_log_file_filtered() {
+        fn test_layout_v2_uncommitted_log_file_filtered() {
             let mut records = HashMap::new();
             // Log file has a DIFFERENT base commit timestamp 
(20240418173205000)
             // that is NOT in the completion view
@@ -967,7 +1006,7 @@ mod tests {
                 state: State::Completed,
                 epoch_millis: 0,
             }];
-            let view = CompletionTimeView::from_instants(&instants);
+            let view = create_layout_v2_view(&instants);
 
             let result = file_groups_from_files_partition_records(&records, 
"parquet", &view);
             assert!(result.is_ok());
@@ -984,7 +1023,7 @@ mod tests {
         }
 
         #[test]
-        fn test_v8_committed_log_file_included() {
+        fn test_layout_v2_committed_log_file_included() {
             let mut records = HashMap::new();
             let (key, record) = create_files_record(
                 "partition1",
@@ -1006,7 +1045,7 @@ mod tests {
                 state: State::Completed,
                 epoch_millis: 0,
             }];
-            let view = CompletionTimeView::from_instants(&instants);
+            let view = create_layout_v2_view(&instants);
 
             let result = file_groups_from_files_partition_records(&records, 
"parquet", &view);
             assert!(result.is_ok());
@@ -1038,7 +1077,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
@@ -1063,7 +1102,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
@@ -1091,7 +1130,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "parquet",
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
@@ -1112,7 +1151,7 @@ mod tests {
             let result = file_groups_from_files_partition_records(
                 &records,
                 "hfile", // Different base file extension
-                &V6CompletionTimeView::new(),
+                &create_layout_v1_view(),
             );
             assert!(result.is_ok());
             let file_groups_map = result.unwrap();
diff --git a/crates/core/src/file_group/log_file/content.rs 
b/crates/core/src/file_group/log_file/content.rs
index 50ce84c..11e3eb1 100644
--- a/crates/core/src/file_group/log_file/content.rs
+++ b/crates/core/src/file_group/log_file/content.rs
@@ -232,7 +232,7 @@ impl Decoder {
     ///
     /// HFile blocks are used in metadata table log files. Unlike Avro/Parquet 
blocks,
     /// the content is NOT converted to Arrow RecordBatch because:
-    /// - MDT operations need key-based lookup/merge
+    /// - Metadata table operations need key-based lookup/merge
     /// - Values are Avro-serialized payloads decoded on demand
     ///
     /// The HFile content structure:
diff --git a/crates/core/src/file_group/log_file/mod.rs 
b/crates/core/src/file_group/log_file/mod.rs
index c8ce94e..9b634f7 100644
--- a/crates/core/src/file_group/log_file/mod.rs
+++ b/crates/core/src/file_group/log_file/mod.rs
@@ -18,7 +18,7 @@
  */
 use crate::error::CoreError;
 use crate::storage::file_metadata::FileMetadata;
-use crate::timeline::completion_time::TimelineViewByCompletionTime;
+use crate::timeline::completion_time::CompletionTimeView;
 use crate::Result;
 use std::cmp::Ordering;
 use std::fmt::Display;
@@ -148,7 +148,7 @@ impl LogFile {
     ///
     /// For v6 tables, the view returns `None` and this is a no-op.
     /// For v8+ tables, this sets the completion timestamp for completed 
commits.
-    pub fn set_completion_time<V: TimelineViewByCompletionTime>(&mut self, 
view: &V) {
+    pub fn set_completion_time<V: CompletionTimeView>(&mut self, view: &V) {
         self.completion_timestamp = view
             .get_completion_time(&self.timestamp)
             .map(|s| s.to_string());
diff --git a/crates/core/src/file_group/log_file/scanner.rs 
b/crates/core/src/file_group/log_file/scanner.rs
index 1614023..90df710 100644
--- a/crates/core/src/file_group/log_file/scanner.rs
+++ b/crates/core/src/file_group/log_file/scanner.rs
@@ -304,7 +304,9 @@ mod tests {
     use crate::config::HudiConfigs;
     use crate::file_group::record_batches::RecordBatches;
     use crate::hfile::HFileReader;
-    use 
crate::metadata::table_record::decode_files_partition_record_with_schema;
+    use crate::metadata::table_record::{
+        decode_files_partition_record_with_schema, FilesPartitionRecord,
+    };
     use crate::storage::util::parse_uri;
     use apache_avro::Schema as AvroSchema;
     use hudi_test::QuickstartTripsTable;
@@ -511,7 +513,7 @@ mod tests {
 
         // Validate __all_partitions__ records
         let all_partitions = records_by_key
-            .get("__all_partitions__")
+            .get(FilesPartitionRecord::ALL_PARTITIONS_KEY)
             .expect("Should have __all_partitions__ records");
         assert_eq!(
             all_partitions.len(),
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 0551eeb..063792e 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -26,7 +26,7 @@ use crate::expr::filter::{Filter, SchemableFilter};
 use crate::file_group::file_slice::FileSlice;
 use crate::file_group::log_file::scanner::{LogFileScanner, ScanResult};
 use crate::file_group::record_batches::RecordBatches;
-use crate::hfile::HFileReader;
+use crate::hfile::{HFileReader, HFileRecord};
 use crate::merge::record_merger::RecordMerger;
 use crate::metadata::merger::FilesPartitionMerger;
 use crate::metadata::meta_field::MetaField;
@@ -340,50 +340,32 @@ impl FileGroupReader {
         crate::util::path::is_metadata_table_path(&base_path)
     }
 
-    /// Reads a metadata table file slice and returns merged 
FilesPartitionRecords.
-    ///
-    /// This method is specifically designed for reading the metadata table's 
`files` partition,
-    /// which uses HFile format for base files and HFile blocks in log files.
+    /// Read records from metadata table files partition.
     ///
     /// # Arguments
-    /// * `base_file_path` - Relative path to the HFile base file
-    /// * `log_file_paths` - Relative paths to log files
+    /// * `file_slice` - The file slice to read from
+    /// * `keys` - Only read records with these keys. If empty, reads all 
records.
     ///
     /// # Returns
-    /// A HashMap mapping record keys (partition paths or 
"__all_partitions__") to
-    /// merged `FilesPartitionRecord`s containing file information.
-    ///
-    /// # Example
-    /// ```ignore
-    /// let reader = 
FileGroupReader::new_with_options(metadata_table_base_uri, options)?;
-    /// let merged = reader.read_file_slice_from_metadata_table_paths(
-    ///     "files/files-0000-0_0-0-0_00000000000000.hfile",
-    ///     vec!["files/.files-0000-0_20240101120000000.log.1_0-100-200"],
-    /// ).await?;
-    ///
-    /// // Get file listing for a partition
-    /// if let Some(record) = merged.get("city=chennai") {
-    ///     for file_name in record.active_file_names() {
-    ///         println!("Active file: {}", file_name);
-    ///     }
-    /// }
-    /// ```
-    pub async fn read_file_slice_from_metadata_table_paths<I, S>(
+    /// HashMap containing the requested keys (or all keys if `keys` is empty).
+    pub(crate) async fn read_metadata_table_files_partition(
         &self,
-        base_file_path: &str,
-        log_file_paths: I,
-    ) -> Result<HashMap<String, FilesPartitionRecord>>
-    where
-        I: IntoIterator<Item = S>,
-        S: AsRef<str>,
-    {
-        let log_file_paths: Vec<String> = log_file_paths
-            .into_iter()
-            .map(|s| s.as_ref().to_string())
-            .collect();
+        file_slice: &FileSlice,
+        keys: &[&str],
+    ) -> Result<HashMap<String, FilesPartitionRecord>> {
+        let base_file_path = file_slice.base_file_relative_path()?;
+        let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+            file_slice
+                .log_files
+                .iter()
+                .map(|log_file| file_slice.log_file_relative_path(log_file))
+                .collect::<Result<Vec<String>>>()?
+        } else {
+            vec![]
+        };
 
-        // Read HFile base file using async open
-        let mut hfile_reader = HFileReader::open(&self.storage, base_file_path)
+        // Open HFile
+        let mut hfile_reader = HFileReader::open(&self.storage, 
&base_file_path)
             .await
             .map_err(|e| {
                 ReadFileSliceError(format!(
@@ -399,10 +381,20 @@ impl FileGroupReader {
             .ok_or_else(|| ReadFileSliceError("No Avro schema found in 
HFile".to_string()))?
             .clone();
 
-        // Collect base records
-        let base_records = hfile_reader
-            .collect_records()
-            .map_err(|e| ReadFileSliceError(format!("Failed to collect HFile 
records: {:?}", e)))?;
+        // Read base records: all if keys is empty, targeted lookup otherwise
+        let base_records: Vec<HFileRecord> = if keys.is_empty() {
+            hfile_reader.collect_records().map_err(|e| {
+                ReadFileSliceError(format!("Failed to collect HFile records: 
{:?}", e))
+            })?
+        } else {
+            let lookup_results = hfile_reader.lookup_records(keys).map_err(|e| 
{
+                ReadFileSliceError(format!("Failed to lookup HFile records: 
{:?}", e))
+            })?;
+            lookup_results
+                .into_iter()
+                .filter_map(|(_, opt_record)| opt_record)
+                .collect()
+        };
 
         // Scan log files if present
         let log_records = if log_file_paths.is_empty() {
@@ -424,59 +416,9 @@ impl FileGroupReader {
             }
         };
 
-        // Merge base and log records
+        // Merge records (key filtering is applied internally when keys is 
non-empty)
         let merger = FilesPartitionMerger::new(schema);
-        merger.merge(&base_records, &log_records)
-    }
-
-    /// Same as [FileGroupReader::read_file_slice_from_metadata_table_paths], 
but blocking.
-    pub fn read_file_slice_from_metadata_table_paths_blocking<I, S>(
-        &self,
-        base_file_path: &str,
-        log_file_paths: I,
-    ) -> Result<HashMap<String, FilesPartitionRecord>>
-    where
-        I: IntoIterator<Item = S>,
-        S: AsRef<str>,
-    {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(
-                self.read_file_slice_from_metadata_table_paths(base_file_path, 
log_file_paths),
-            )
-    }
-
-    /// Reads a metadata table file slice using a FileSlice object.
-    ///
-    /// Convenience wrapper around 
[FileGroupReader::read_file_slice_from_metadata_table_paths].
-    pub async fn read_file_slice_from_metadata_table(
-        &self,
-        file_slice: &FileSlice,
-    ) -> Result<HashMap<String, FilesPartitionRecord>> {
-        let base_file_path = file_slice.base_file_relative_path()?;
-        let log_file_paths = if file_slice.has_log_file() {
-            file_slice
-                .log_files
-                .iter()
-                .map(|log_file| file_slice.log_file_relative_path(log_file))
-                .collect::<Result<Vec<String>>>()?
-        } else {
-            vec![]
-        };
-        self.read_file_slice_from_metadata_table_paths(&base_file_path, 
log_file_paths)
-            .await
-    }
-
-    /// Same as [FileGroupReader::read_file_slice_from_metadata_table], but 
blocking.
-    pub fn read_file_slice_from_metadata_table_blocking(
-        &self,
-        file_slice: &FileSlice,
-    ) -> Result<HashMap<String, FilesPartitionRecord>> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(self.read_file_slice_from_metadata_table(file_slice))
+        merger.merge_for_keys(&base_records, &log_records, keys)
     }
 }
 
@@ -841,151 +783,74 @@ mod tests {
         "files/.files-0000-0_20251220210130911.log.1_3-1149-3338",
     ];
 
-    #[test]
-    fn test_read_file_slice_from_metadata_table_paths_without_log_files() -> 
Result<()> {
-        use crate::metadata::table_record::MetadataRecordType;
-
-        let reader = create_metadata_table_reader()?;
-
-        // Read base file only (no log files)
-        let log_files: Vec<&str> = vec![];
-        let merged = reader.read_file_slice_from_metadata_table_paths_blocking(
-            METADATA_TABLE_FILES_BASE_FILE,
-            log_files,
-        )?;
-
-        // Initial base file only has __all_partitions__ record
-        // City partition records are added through log files
-        assert_eq!(merged.len(), 1, "Base file should have 1 key");
-        assert!(merged.contains_key("__all_partitions__"));
+    fn create_test_file_slice() -> Result<FileSlice> {
+        use crate::file_group::FileGroup;
 
-        // Validate __all_partitions__ record
-        let all_parts = merged.get("__all_partitions__").unwrap();
-        assert_eq!(all_parts.record_type, MetadataRecordType::AllPartitions);
+        let mut fg = FileGroup::new("files-0000-0".to_string(), 
"files".to_string());
+        let base_file_name = METADATA_TABLE_FILES_BASE_FILE
+            .strip_prefix("files/")
+            .unwrap();
+        fg.add_base_file_from_name(base_file_name)?;
+        let log_file_names: Vec<_> = METADATA_TABLE_FILES_LOG_FILES
+            .iter()
+            .map(|s| s.strip_prefix("files/").unwrap())
+            .collect();
+        fg.add_log_files_from_names(log_file_names)?;
 
-        Ok(())
+        Ok(fg
+            .get_file_slice_as_of("99999999999999999")
+            .expect("Should have file slice")
+            .clone())
     }
 
-    #[test]
-    fn test_read_file_slice_from_metadata_table_paths_with_log_files() -> 
Result<()> {
-        use crate::metadata::table_record::MetadataRecordType;
+    #[tokio::test]
+    async fn test_read_metadata_table_files_partition() -> Result<()> {
+        use crate::metadata::table_record::{FilesPartitionRecord, 
MetadataRecordType};
 
         let reader = create_metadata_table_reader()?;
+        let file_slice = create_test_file_slice()?;
 
-        // Read base file + all log files
-        let merged = reader.read_file_slice_from_metadata_table_paths_blocking(
-            METADATA_TABLE_FILES_BASE_FILE,
-            METADATA_TABLE_FILES_LOG_FILES.to_vec(),
-        )?;
+        // Test 1: Read all records (empty keys)
+        let all_records = reader
+            .read_metadata_table_files_partition(&file_slice, &[])
+            .await?;
 
-        // Should still have 4 keys after merging
-        assert_eq!(merged.len(), 4, "Should have 4 partition keys after 
merge");
+        // Should have 4 keys after merging
+        assert_eq!(
+            all_records.len(),
+            4,
+            "Should have 4 partition keys after merge"
+        );
 
         // Validate all partition keys have correct record types
-        for (key, record) in &merged {
-            if key == "__all_partitions__" {
+        for (key, record) in &all_records {
+            if key == FilesPartitionRecord::ALL_PARTITIONS_KEY {
                 assert_eq!(record.record_type, 
MetadataRecordType::AllPartitions);
             } else {
                 assert_eq!(record.record_type, MetadataRecordType::Files);
             }
         }
 
-        // Expected UUIDs for each partition's files
-        const CHENNAI_UUID: &str = "6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc";
-        const SAN_FRANCISCO_UUID: &str = 
"036ded81-9ed4-479f-bcea-7145dfa0079b";
-        const SAO_PAULO_UUID: &str = "8aa68f7e-afd6-4c94-b86c-8a886552e08d";
-
-        // Validate chennai partition
-        let chennai = merged.get("city=chennai").unwrap();
-        let active_files = chennai.active_file_names();
+        // Validate chennai partition has files
+        let chennai = all_records.get("city=chennai").unwrap();
         assert!(
-            active_files.len() >= 2,
-            "Chennai should have at least 2 active files, got {}",
-            active_files.len()
+            chennai.active_file_names().len() >= 2,
+            "Chennai should have at least 2 active files"
         );
         assert!(chennai.total_size() > 0, "Total size should be > 0");
-        for file_name in &active_files {
-            assert!(
-                file_name.contains(CHENNAI_UUID),
-                "Chennai file should contain UUID: {}",
-                file_name
-            );
-        }
-
-        // Validate san_francisco partition
-        let sf = merged.get("city=san_francisco").unwrap();
-        for file_name in sf.active_file_names() {
-            assert!(
-                file_name.contains(SAN_FRANCISCO_UUID),
-                "San Francisco file should contain UUID: {}",
-                file_name
-            );
-        }
-
-        // Validate sao_paulo partition
-        let sp = merged.get("city=sao_paulo").unwrap();
-        for file_name in sp.active_file_names() {
-            assert!(
-                file_name.contains(SAO_PAULO_UUID),
-                "Sao Paulo file should contain UUID: {}",
-                file_name
-            );
-        }
-
-        Ok(())
-    }
-
-    #[test]
-    fn test_read_file_slice_from_metadata_table_error_handling() -> Result<()> 
{
-        let reader = create_metadata_table_reader()?;
-
-        // Test with non-existent base file
-        let result = reader.read_file_slice_from_metadata_table_paths_blocking(
-            "files/nonexistent.hfile",
-            Vec::<&str>::new(),
-        );
-
-        assert!(result.is_err(), "Should error on non-existent file");
-        let err = result.unwrap_err().to_string();
-        assert!(
-            err.contains("Failed to read metadata table base file"),
-            "Error should mention metadata table base file: {}",
-            err
-        );
-
-        Ok(())
-    }
-
-    #[test]
-    fn test_read_file_slice_from_metadata_table_blocking() -> Result<()> {
-        use crate::file_group::FileGroup;
 
-        let reader = create_metadata_table_reader()?;
-
-        // Build FileGroup using the API
-        let mut fg = FileGroup::new("files-0000-0".to_string(), 
"files".to_string());
-        let base_file_name = METADATA_TABLE_FILES_BASE_FILE
-            .strip_prefix("files/")
-            .unwrap();
-        fg.add_base_file_from_name(base_file_name)?;
-        let log_file_names: Vec<_> = METADATA_TABLE_FILES_LOG_FILES
-            .iter()
-            .map(|s| s.strip_prefix("files/").unwrap())
-            .collect();
-        fg.add_log_files_from_names(log_file_names)?;
-
-        let file_slice = fg
-            .get_file_slice_as_of("99999999999999999")
-            .expect("Should have file slice");
-
-        let merged = 
reader.read_file_slice_from_metadata_table_blocking(file_slice)?;
+        // Test 2: Read specific keys
+        let keys = vec![FilesPartitionRecord::ALL_PARTITIONS_KEY, 
"city=chennai"];
+        let filtered_records = reader
+            .read_metadata_table_files_partition(&file_slice, &keys)
+            .await?;
 
-        // Should have 4 keys: __all_partitions__ + 3 city partitions
-        assert_eq!(merged.len(), 4);
-        assert!(merged.contains_key("__all_partitions__"));
-        assert!(merged.contains_key("city=chennai"));
-        assert!(merged.contains_key("city=san_francisco"));
-        assert!(merged.contains_key("city=sao_paulo"));
+        // Should only contain the requested keys
+        assert_eq!(filtered_records.len(), 2);
+        
assert!(filtered_records.contains_key(FilesPartitionRecord::ALL_PARTITIONS_KEY));
+        assert!(filtered_records.contains_key("city=chennai"));
+        assert!(!filtered_records.contains_key("city=san_francisco"));
+        assert!(!filtered_records.contains_key("city=sao_paulo"));
 
         Ok(())
     }
diff --git a/crates/core/src/hfile/reader.rs b/crates/core/src/hfile/reader.rs
index 3c7d11e..82b56ec 100644
--- a/crates/core/src/hfile/reader.rs
+++ b/crates/core/src/hfile/reader.rs
@@ -879,20 +879,20 @@ impl HFileReader {
         Ok(HFileIterator { reader: self })
     }
 
-    // ================== HFileRecord API for MDT ==================
+    // ================== HFileRecord API for metadata table ==================
 
     /// Convert a KeyValue to an owned HFileRecord.
     ///
     /// This extracts the key content (without length prefix) and value bytes
-    /// into an owned struct suitable for MDT operations.
+    /// into an owned struct suitable for metadata table operations.
     fn key_value_to_record(kv: &KeyValue) -> HFileRecord {
         HFileRecord::new(kv.key().content().to_vec(), kv.value().to_vec())
     }
 
     /// Collect all records from the HFile as owned HFileRecords.
     ///
-    /// This is useful for MDT operations where records need to be stored
-    /// and merged with log file records.
+    /// This is useful for metadata table operations where records need to be
+    /// stored and merged with log file records.
     ///
     /// # Example
     /// ```ignore
@@ -1998,7 +1998,7 @@ mod tests {
     // 3. "city=san_francisco" - 2 parquet files (UUID: 
036ded81-9ed4-479f-bcea-7145dfa0079b)
     // 4. "city=sao_paulo" - 2 parquet files (UUID: 
8aa68f7e-afd6-4c94-b86c-8a886552e08d)
 
-    use crate::metadata::table_record::decode_files_partition_record;
+    use crate::metadata::table_record::{decode_files_partition_record, 
FilesPartitionRecord};
     use hudi_test::QuickstartTripsTable;
 
     /// Get the path to the files partition directory in the test table.
@@ -2062,7 +2062,7 @@ mod tests {
         assert_eq!(
             keys,
             vec![
-                "__all_partitions__",
+                FilesPartitionRecord::ALL_PARTITIONS_KEY,
                 "city=chennai",
                 "city=san_francisco",
                 "city=sao_paulo"
@@ -2100,7 +2100,7 @@ mod tests {
                 .unwrap_or_else(|e| panic!("Failed to decode record for key 
{}: {}", key, e));
 
             match key {
-                "__all_partitions__" => {
+                FilesPartitionRecord::ALL_PARTITIONS_KEY => {
                     // Validate ALL_PARTITIONS record type and partitions
                     assert_eq!(
                         files_record.record_type,
diff --git a/crates/core/src/hfile/record.rs b/crates/core/src/hfile/record.rs
index 4ee6364..b041ed4 100644
--- a/crates/core/src/hfile/record.rs
+++ b/crates/core/src/hfile/record.rs
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-//! HFile record types for MDT (Metadata Table) operations.
+//! HFile record types for metadata table operations.
 //!
 //! This module provides simple, owned record types for HFile key-value pairs.
-//! These are designed for use in MDT operations where:
+//! These are designed for use in metadata table operations where:
 //! - Records need to be passed around and stored
 //! - Key-based lookups and merging are primary operations
 //! - Values are Avro-serialized payloads decoded on demand
@@ -31,8 +31,8 @@ use std::cmp::Ordering;
 
 /// An owned HFile record with key and value.
 ///
-/// This is a simple struct designed for MDT operations. The key is the
-/// UTF-8 record key (content only, without HFile key structure), and
+/// This is a simple struct designed for metadata table operations. The key is
+/// the UTF-8 record key (content only, without HFile key structure), and
 /// the value is the raw bytes (typically Avro-serialized payload).
 ///
 /// # Example
@@ -83,7 +83,7 @@ impl HFileRecord {
 
     /// Returns whether this record represents a deletion.
     ///
-    /// In MDT, a deleted record has an empty value.
+    /// In metadata table, a deleted record has an empty value.
     pub fn is_deleted(&self) -> bool {
         self.value.is_empty()
     }
diff --git a/crates/core/src/metadata/merger.rs 
b/crates/core/src/metadata/merger.rs
index b0afac0..f121099 100644
--- a/crates/core/src/metadata/merger.rs
+++ b/crates/core/src/metadata/merger.rs
@@ -105,6 +105,63 @@ impl FilesPartitionMerger {
         Ok(merged)
     }
 
+    /// Merge records, optionally filtering to specific keys.
+    ///
+    /// When `keys` is empty, processes all records (same as `merge()`).
+    /// When `keys` is non-empty, only processes records matching those keys,
+    /// which is useful for efficient partition pruning.
+    ///
+    /// # Arguments
+    /// * `base_records` - Records from the base HFile
+    /// * `log_records` - Records from log files
+    /// * `keys` - Only process records with these keys. If empty, process all.
+    ///
+    /// # Returns
+    /// A HashMap containing the requested keys (or all if keys is empty).
+    pub fn merge_for_keys(
+        &self,
+        base_records: &[HFileRecord],
+        log_records: &[HFileRecord],
+        keys: &[&str],
+    ) -> Result<HashMap<String, FilesPartitionRecord>> {
+        // When keys is empty, process all records
+        if keys.is_empty() {
+            return self.merge(base_records, log_records);
+        }
+
+        let key_set: std::collections::HashSet<&str> = 
keys.iter().copied().collect();
+        let mut merged: HashMap<String, FilesPartitionRecord> = HashMap::new();
+
+        // Process base records, filtering by key
+        for record in base_records {
+            if let Some(key_str) = record.key_as_str() {
+                if key_set.contains(key_str) {
+                    let decoded = self.decode_record(record)?;
+                    merged.insert(decoded.key.clone(), decoded);
+                }
+            }
+        }
+
+        // Process log records, filtering by key
+        for record in log_records {
+            if let Some(key_str) = record.key_as_str() {
+                if key_set.contains(key_str) {
+                    let decoded = self.decode_record(record)?;
+                    match merged.get_mut(&decoded.key) {
+                        Some(existing) => {
+                            self.merge_files_partition_records(existing, 
&decoded);
+                        }
+                        None => {
+                            merged.insert(decoded.key.clone(), decoded);
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(merged)
+    }
+
     /// Decode an HFile record using the schema.
     fn decode_record(&self, record: &HFileRecord) -> 
Result<FilesPartitionRecord> {
         decode_files_partition_record_with_schema(record, &self.schema)
@@ -159,7 +216,7 @@ impl FilesPartitionMerger {
 mod tests {
     use super::*;
     use crate::hfile::HFileReader;
-    use crate::metadata::table_record::MetadataRecordType;
+    use crate::metadata::table_record::{FilesPartitionRecord, 
MetadataRecordType};
     use hudi_test::QuickstartTripsTable;
     use std::path::PathBuf;
 
@@ -422,7 +479,7 @@ mod tests {
 
         // Validate __all_partitions__ - should list all 3 city partitions
         let all_partitions = merged
-            .get("__all_partitions__")
+            .get(FilesPartitionRecord::ALL_PARTITIONS_KEY)
             .expect("Should have __all_partitions__");
         assert_eq!(
             all_partitions.record_type,
@@ -533,4 +590,45 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_merge_for_keys_filters_to_requested_keys() -> crate::Result<()> {
+        // Get base records from the HFile
+        let dir = files_partition_dir();
+        let mut hfiles: Vec<_> = std::fs::read_dir(&dir)
+            .unwrap()
+            .filter_map(|e| e.ok())
+            .filter(|e| {
+                e.path()
+                    .extension()
+                    .map(|ext| ext == "hfile")
+                    .unwrap_or(false)
+            })
+            .collect();
+        hfiles.sort_by_key(|e| e.file_name());
+
+        let hfile_path = hfiles.last().expect("No HFile found").path();
+        let bytes = std::fs::read(&hfile_path).expect("Failed to read HFile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create 
HFileReader");
+        let schema = reader
+            .get_avro_schema()
+            .expect("Failed to get schema")
+            .expect("No schema in HFile")
+            .clone();
+        let base_records = reader.collect_records().expect("Failed to collect 
records");
+
+        let merger = FilesPartitionMerger::new(schema);
+
+        // Request only specific keys
+        let keys = vec![FilesPartitionRecord::ALL_PARTITIONS_KEY, 
"city=chennai"];
+        let merged = merger.merge_for_keys(&base_records, &[], &keys)?;
+
+        // Should only contain requested keys
+        assert_eq!(merged.len(), 2);
+        assert!(merged.contains_key(FilesPartitionRecord::ALL_PARTITIONS_KEY));
+        assert!(merged.contains_key("city=chennai"));
+        assert!(!merged.contains_key("city=san_francisco"));
+
+        Ok(())
+    }
 }
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs
index e80a49f..52a64a6 100644
--- a/crates/core/src/metadata/mod.rs
+++ b/crates/core/src/metadata/mod.rs
@@ -20,7 +20,8 @@ pub mod commit;
 pub mod merger;
 pub mod meta_field;
 pub mod replace_commit;
-pub mod table_record;
+pub mod table;
+pub use table::records as table_record;
 
 pub const HUDI_METADATA_DIR: &str = ".hoodie";
 pub const DELTALAKE_METADATA_DIR: &str = "_delta_log";
diff --git a/crates/core/src/metadata/table/mod.rs 
b/crates/core/src/metadata/table/mod.rs
new file mode 100644
index 0000000..9d81cd0
--- /dev/null
+++ b/crates/core/src/metadata/table/mod.rs
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Metadata table APIs for the Hudi Table.
+//!
+//! This module provides methods for interacting with Hudi's metadata table,
+//! which stores file listings and other metadata for efficient table 
operations.
+
+pub mod records;
+
+use std::collections::HashMap;
+
+use crate::config::read::HudiReadConfig;
+use crate::config::table::HudiTableConfig::{
+    MetadataTableEnabled, MetadataTablePartitions, PartitionFields, 
TableVersion,
+};
+use crate::error::CoreError;
+use crate::expr::filter::from_str_tuples;
+use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
+use crate::storage::util::join_url_segments;
+use crate::table::partition::PartitionPruner;
+use crate::table::Table;
+use crate::Result;
+
+use records::FilesPartitionRecord;
+
+impl Table {
+    /// Check if this table is a metadata table.
+    ///
+    /// Detection is based on the base path ending with `.hoodie/metadata`.
+    pub fn is_metadata_table(&self) -> bool {
+        let base_path: String = self
+            .hudi_configs
+            .get_or_default(crate::config::table::HudiTableConfig::BasePath)
+            .into();
+        crate::util::path::is_metadata_table_path(&base_path)
+    }
+
+    /// Get the list of available metadata table partitions for this table.
+    ///
+    /// Returns the partitions configured in [`MetadataTablePartitions`].
+    pub fn get_metadata_table_partitions(&self) -> Vec<String> {
+        self.hudi_configs
+            .get_or_default(MetadataTablePartitions)
+            .into()
+    }
+
+    /// Check if the metadata table is enabled.
+    ///
+    /// Returns `true` if:
+    /// 1. Table version is >= 8 (metadata table support is only for v8+ 
tables), AND
+    /// 2. Table is not a metadata table itself, AND
+    /// 3. Either:
+    ///    - `hoodie.metadata.enable` is explicitly true, OR
+    ///    - `files` is in the configured [`MetadataTablePartitions`]
+    ///
+    /// # Note
+    /// The metadata table is considered active when partitions are configured,
+    /// even without explicit `hoodie.metadata.enable=true`. When metadata 
table
+    /// is enabled, it must have at least the `files` partition enabled.
+    pub fn is_metadata_table_enabled(&self) -> bool {
+        // TODO: drop v6 support then no need to check table version here
+        let table_version: isize = self
+            .hudi_configs
+            .get(TableVersion)
+            .map(|v| v.into())
+            .unwrap_or(0);
+
+        if table_version < 8 {
+            return false;
+        }
+
+        if self.is_metadata_table() {
+            return false;
+        }
+
+        // Check if "files" partition is configured
+        let has_files_partition = self
+            .get_metadata_table_partitions()
+            .contains(&FilesPartitionRecord::PARTITION_NAME.to_string());
+
+        // Explicit check for hoodie.metadata.enable
+        let metadata_explicitly_enabled: bool = self
+            .hudi_configs
+            .get_or_default(MetadataTableEnabled)
+            .into();
+
+        metadata_explicitly_enabled || has_files_partition
+    }
+
+    /// Create a metadata table instance for this data table.
+    ///
+    /// TODO: support more partitions. Only "files" is used currently.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the metadata table cannot be created or if there 
are
+    /// no metadata table partitions configured.
+    ///
+    /// # Note
+    /// Must be called on a DATA table, not a METADATA table.
+    pub async fn new_metadata_table(&self) -> Result<Table> {
+        if self.is_metadata_table() {
+            return Err(CoreError::MetadataTable(
+                "Cannot create metadata table from another metadata 
table".to_string(),
+            ));
+        }
+
+        let mdt_partitions = self.get_metadata_table_partitions();
+        if mdt_partitions.is_empty() {
+            return Err(CoreError::MetadataTable(
+                "No metadata table partitions configured".to_string(),
+            ));
+        }
+
+        let mdt_url = join_url_segments(&self.base_url(), &[".hoodie", 
"metadata"])?;
+        Table::new_with_options(
+            mdt_url.as_str(),
+            [(PartitionFields.as_ref(), METADATA_TABLE_PARTITION_FIELD)],
+        )
+        .await
+    }
+
+    /// Same as [Table::new_metadata_table], but blocking.
+    pub fn new_metadata_table_blocking(&self) -> Result<Table> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { self.new_metadata_table().await })
+    }
+
+    /// Fetch records from the `files` partition of metadata table
+    /// with optional data table partition pruning.
+    ///
+    /// # Note
+    /// Must be called on a DATA table, not a METADATA table.
+    pub async fn read_metadata_table_files_partition(
+        &self,
+        partition_pruner: &PartitionPruner,
+    ) -> Result<HashMap<String, FilesPartitionRecord>> {
+        let metadata_table = self.new_metadata_table().await?;
+        metadata_table
+            .fetch_files_partition_records(partition_pruner)
+            .await
+    }
+
+    /// Same as [Table::read_metadata_table_files_partition], but blocking.
+    pub fn read_metadata_table_files_partition_blocking(
+        &self,
+        partition_pruner: &PartitionPruner,
+    ) -> Result<HashMap<String, FilesPartitionRecord>> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            
.block_on(self.read_metadata_table_files_partition(partition_pruner))
+    }
+
+    /// Fetch records from the `files` partition with optional partition 
pruning.
+    ///
+    /// # Arguments
+    /// * `partition_pruner` - Data table's partition pruner to filter 
partitions.
+    ///
+    /// # Note
+    /// Must be called on a METADATA table instance.
+    pub async fn fetch_files_partition_records(
+        &self,
+        partition_pruner: &PartitionPruner,
+    ) -> Result<HashMap<String, FilesPartitionRecord>> {
+        // If no partition filters, read all records (pass empty keys)
+        if partition_pruner.is_empty() {
+            return self.read_files_partition(&[]).await;
+        }
+
+        // Step 1: Get all partition paths
+        let all_partitions_records = self
+            .read_files_partition(&[FilesPartitionRecord::ALL_PARTITIONS_KEY])
+            .await?;
+
+        let partition_names: Vec<&str> = all_partitions_records
+            .get(FilesPartitionRecord::ALL_PARTITIONS_KEY)
+            .map(|r| r.partition_names())
+            .unwrap_or_default();
+
+        // Step 2: Apply partition pruning
+        let pruned: Vec<&str> = partition_names
+            .into_iter()
+            .filter(|p| partition_pruner.should_include(p))
+            .collect();
+
+        if pruned.is_empty() {
+            return Ok(HashMap::new());
+        }
+
+        // Step 3: Read only the pruned partition records
+        self.read_files_partition(&pruned).await
+    }
+
+    /// Read records from the `files` partition.
+    ///
+    /// If keys is empty, reads all records. Otherwise, reads only the 
specified keys.
+    ///
+    /// # Note
+    /// Must be called on a METADATA table instance.
+    async fn read_files_partition(
+        &self,
+        keys: &[&str],
+    ) -> Result<HashMap<String, FilesPartitionRecord>> {
+        let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() else {
+            return Ok(HashMap::new());
+        };
+
+        let timeline_view = self.timeline.create_view_as_of(timestamp).await?;
+
+        let filters = from_str_tuples([(
+            METADATA_TABLE_PARTITION_FIELD,
+            "=",
+            FilesPartitionRecord::PARTITION_NAME,
+        )])?;
+        let partition_schema = self.get_partition_schema().await?;
+        let partition_pruner =
+            PartitionPruner::new(&filters, &partition_schema, 
self.hudi_configs.as_ref())?;
+
+        let file_slices = self
+            .file_system_view
+            .get_file_slices_by_storage_listing(&partition_pruner, 
&timeline_view)
+            .await?;
+
+        if file_slices.len() != 1 {
+            return Err(CoreError::MetadataTable(format!(
+                "Expected 1 file slice for {} partition, got {}",
+                FilesPartitionRecord::PARTITION_NAME,
+                file_slices.len()
+            )));
+        }
+
+        let file_slice = file_slices.into_iter().next().unwrap();
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp,
+        )])?;
+
+        fg_reader
+            .read_metadata_table_files_partition(&file_slice, keys)
+            .await
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::config::table::HudiTableConfig::TableVersion;
+    use crate::table::partition::PartitionPruner;
+    use hudi_test::{QuickstartTripsTable, SampleTable};
+    use records::{FilesPartitionRecord, MetadataRecordType};
+    use std::collections::HashSet;
+
+    fn get_data_table() -> Table {
+        let table_path = 
QuickstartTripsTable::V8Trips8I3U1D.path_to_mor_avro();
+        Table::new_blocking(&table_path).unwrap()
+    }
+
+    #[test]
+    fn hudi_table_read_metadata_table_files_partition() {
+        let data_table = get_data_table();
+        let partition_schema = 
data_table.get_partition_schema_blocking().unwrap();
+        let partition_pruner =
+            PartitionPruner::new(&[], &partition_schema, 
data_table.hudi_configs.as_ref()).unwrap();
+
+        let records = data_table
+            .read_metadata_table_files_partition_blocking(&partition_pruner)
+            .unwrap();
+
+        // Should have 4 records: __all_partitions__ + 3 city partitions
+        assert_eq!(records.len(), 4);
+
+        // Validate __all_partitions__ record
+        let all_partitions = records
+            .get(FilesPartitionRecord::ALL_PARTITIONS_KEY)
+            .unwrap();
+        assert_eq!(
+            all_partitions.record_type,
+            MetadataRecordType::AllPartitions
+        );
+        let partition_names: HashSet<&str> = 
all_partitions.partition_names().into_iter().collect();
+        assert_eq!(
+            partition_names,
+            HashSet::from(["city=chennai", "city=san_francisco", 
"city=sao_paulo"])
+        );
+
+        // Validate city=chennai record with actual file names
+        let chennai = records.get("city=chennai").unwrap();
+        assert_eq!(chennai.record_type, MetadataRecordType::Files);
+        let chennai_files: HashSet<_> = 
chennai.active_file_names().into_iter().collect();
+        assert_eq!(
+            chennai_files,
+            HashSet::from([
+                
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_2-986-2794_20251220210108078.parquet",
+                
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_0-1112-3190_20251220210129235.parquet",
+                
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210127080.log.1_0-1072-3078",
+                
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210128625.log.1_0-1097-3150",
+            ])
+        );
+        assert!(chennai.total_size() > 0);
+    }
+
+    #[test]
+    fn hudi_table_get_metadata_table_partitions() {
+        let data_table = get_data_table();
+
+        // Verify we can get the metadata table partitions from the data table
+        let partitions = data_table.get_metadata_table_partitions();
+
+        // The test table has 5 metadata table partitions configured
+        assert_eq!(
+            partitions.len(),
+            5,
+            "Should have 5 metadata table partitions, got: {:?}",
+            partitions
+        );
+
+        // Verify all expected partitions are present
+        let expected = [
+            "column_stats",
+            "files",
+            "partition_stats",
+            "record_index",
+            "secondary_index_rider_idx",
+        ];
+        for partition in &expected {
+            assert!(
+                partitions.contains(&partition.to_string()),
+                "Should contain '{}' partition, got: {:?}",
+                partition,
+                partitions
+            );
+        }
+    }
+
+    #[test]
+    fn hudi_table_is_metadata_table_enabled() {
+        // V8 table with files partition configured should enable metadata 
table
+        // even without explicit hoodie.metadata.enable=true
+        let data_table = get_data_table();
+
+        // Verify it's a v8 table
+        let table_version: isize = data_table
+            .hudi_configs
+            .get(TableVersion)
+            .map(|v| v.into())
+            .unwrap_or(0);
+        assert_eq!(table_version, 8, "Test table should be v8");
+
+        // Verify files partition is configured
+        let partitions = data_table.get_metadata_table_partitions();
+        assert!(
+            partitions.contains(&"files".to_string()),
+            "Should have 'files' partition configured"
+        );
+
+        // Verify is_metadata_table_enabled returns true (implicit enablement)
+        assert!(
+            data_table.is_metadata_table_enabled(),
+            "is_metadata_table_enabled should return true for v8 table with 
files partition"
+        );
+    }
+
+    #[test]
+    fn hudi_table_v6_metadata_table_not_enabled() {
+        // V6 tables should NOT have metadata table enabled, even with 
explicit setting
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+
+        // Verify it's a v6 table
+        let table_version: isize = hudi_table
+            .hudi_configs
+            .get(TableVersion)
+            .map(|v| v.into())
+            .unwrap_or(0);
+        assert_eq!(table_version, 6, "Test table should be v6");
+
+        // V6 tables should not have metadata table enabled
+        assert!(
+            !hudi_table.is_metadata_table_enabled(),
+            "is_metadata_table_enabled should return false for v6 table"
+        );
+    }
+
+    #[test]
+    fn hudi_table_is_not_metadata_table() {
+        // A regular data table should not be a metadata table
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        assert!(
+            !hudi_table.is_metadata_table(),
+            "Regular data table should not be a metadata table"
+        );
+    }
+
+    #[test]
+    fn hudi_table_metadata_table_is_metadata_table() {
+        // Create a metadata table and verify it's recognized as such
+        let data_table = get_data_table();
+        let metadata_table = data_table.new_metadata_table_blocking().unwrap();
+        assert!(
+            metadata_table.is_metadata_table(),
+            "Metadata table should be recognized as a metadata table"
+        );
+    }
+
+    #[test]
+    fn hudi_table_new_metadata_table_from_metadata_table_errors() {
+        // Trying to create a metadata table from a metadata table should fail
+        let data_table = get_data_table();
+        let metadata_table = data_table.new_metadata_table_blocking().unwrap();
+
+        let result = metadata_table.new_metadata_table_blocking();
+        assert!(result.is_err());
+        let err = result.unwrap_err();
+        assert!(
+            err.to_string()
+                .contains("Cannot create metadata table from another metadata 
table"),
+            "Error message should indicate cannot create from metadata table"
+        );
+    }
+}
diff --git a/crates/core/src/metadata/table_record.rs 
b/crates/core/src/metadata/table/records.rs
similarity index 92%
rename from crates/core/src/metadata/table_record.rs
rename to crates/core/src/metadata/table/records.rs
index c972904..fd78c41 100644
--- a/crates/core/src/metadata/table_record.rs
+++ b/crates/core/src/metadata/table/records.rs
@@ -39,6 +39,40 @@ use apache_avro::types::Value as AvroValue;
 use apache_avro::Schema as AvroSchema;
 use std::collections::HashMap;
 
+/// Metadata table partition types.
+///
+/// These represent the different partitions (directories) within the metadata 
table,
+/// each storing a different type of index data.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum MetadataPartitionType {
+    /// The "files" partition containing file listings per data table 
partition.
+    Files,
+    /// The "column_stats" partition containing column statistics.
+    ColumnStats,
+    /// The "partition_stats" partition containing partition-level statistics.
+    PartitionStats,
+    /// The "record_index" partition containing record-level index entries.
+    RecordIndex,
+}
+
+impl MetadataPartitionType {
+    /// Get the partition directory name as used in the metadata table.
+    pub fn partition_name(&self) -> &'static str {
+        match self {
+            Self::Files => "files",
+            Self::ColumnStats => "column_stats",
+            Self::PartitionStats => "partition_stats",
+            Self::RecordIndex => "record_index",
+        }
+    }
+}
+
+impl std::fmt::Display for MetadataPartitionType {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "{}", self.partition_name())
+    }
+}
+
 /// File information from the metadata table.
 #[derive(Debug, Clone, PartialEq)]
 pub struct HoodieMetadataFileInfo {
@@ -123,6 +157,9 @@ impl FilesPartitionRecord {
     /// The partition name in the metadata table that stores file listings.
     pub const PARTITION_NAME: &'static str = "files";
 
+    /// The key for the record that contains all partition paths.
+    pub const ALL_PARTITIONS_KEY: &'static str = "__all_partitions__";
+
     /// Check if this is an ALL_PARTITIONS record.
     pub fn is_all_partitions(&self) -> bool {
         self.record_type == MetadataRecordType::AllPartitions
@@ -431,6 +468,40 @@ mod tests {
             .unwrap_or_else(|| panic!("No HFile found in {:?}", dir))
     }
 
+    #[test]
+    fn test_metadata_partition_type_partition_name() {
+        assert_eq!(MetadataPartitionType::Files.partition_name(), "files");
+        assert_eq!(
+            MetadataPartitionType::ColumnStats.partition_name(),
+            "column_stats"
+        );
+        assert_eq!(
+            MetadataPartitionType::PartitionStats.partition_name(),
+            "partition_stats"
+        );
+        assert_eq!(
+            MetadataPartitionType::RecordIndex.partition_name(),
+            "record_index"
+        );
+    }
+
+    #[test]
+    fn test_metadata_partition_type_display() {
+        assert_eq!(format!("{}", MetadataPartitionType::Files), "files");
+        assert_eq!(
+            format!("{}", MetadataPartitionType::ColumnStats),
+            "column_stats"
+        );
+        assert_eq!(
+            format!("{}", MetadataPartitionType::PartitionStats),
+            "partition_stats"
+        );
+        assert_eq!(
+            format!("{}", MetadataPartitionType::RecordIndex),
+            "record_index"
+        );
+    }
+
     #[test]
     fn test_metadata_record_type_from_i32() {
         assert_eq!(
@@ -502,7 +573,7 @@ mod tests {
         // Test ALL_PARTITIONS record
         let all_partitions_record = records
             .iter()
-            .find(|r| r.key_as_str() == Some("__all_partitions__"))
+            .find(|r| r.key_as_str() == 
Some(FilesPartitionRecord::ALL_PARTITIONS_KEY))
             .expect("__all_partitions__ record not found");
 
         let decoded = decode_files_partition_record(&reader, 
all_partitions_record)
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 24dc4ce..b76837f 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -17,20 +17,20 @@
  * under the License.
  */
 
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
 use std::sync::Arc;
 
 use crate::config::table::HudiTableConfig::BaseFileFormat;
 use crate::config::HudiConfigs;
-use crate::file_group::FileGroup;
-use crate::metadata::table_record::FilesPartitionRecord;
-use crate::storage::Storage;
-use crate::timeline::completion_time::TimelineViewByCompletionTime;
-
 use crate::file_group::builder::file_groups_from_files_partition_records;
 use crate::file_group::file_slice::FileSlice;
+use crate::file_group::FileGroup;
+use crate::storage::Storage;
 use crate::table::listing::FileLister;
 use crate::table::partition::PartitionPruner;
+use crate::table::Table;
+use crate::timeline::completion_time::CompletionTimeView;
+use crate::timeline::view::TimelineView;
 use crate::Result;
 use dashmap::DashMap;
 
@@ -58,17 +58,12 @@ impl FileSystemView {
         })
     }
 
-    /// Load file groups by listing from the file system.
+    /// Load file groups by listing from the storage.
     ///
     /// # Arguments
     /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps.
-    ///   - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for 
all lookups)
-    ///   - For v8+ tables: Pass [`CompletionTimeView`] built from timeline 
instants
-    ///
-    /// [`V6CompletionTimeView`]: 
crate::timeline::completion_time::V6CompletionTimeView
-    /// [`CompletionTimeView`]: 
crate::timeline::completion_time::CompletionTimeView
-    async fn load_file_groups_from_file_system<V: TimelineViewByCompletionTime 
+ Sync>(
+    /// * `completion_time_view` - View to look up completion timestamps
+    async fn load_file_groups_from_storage<V: CompletionTimeView + Sync>(
         &self,
         partition_pruner: &PartitionPruner,
         completion_time_view: &V,
@@ -90,34 +85,25 @@ impl FileSystemView {
 
     /// Load file groups from metadata table records.
     ///
-    /// This is an alternative to `load_file_groups_from_file_system` that 
uses pre-fetched
-    /// metadata table records instead of listing from storage. Only 
partitions that pass the
-    /// partition pruner will be loaded.
+    /// This is an alternative to `load_file_groups_from_file_system` that uses
+    /// file listing records fetched from the metadata table. Only partitions 
that
+    /// pass the partition pruner will be loaded.
     ///
-    /// This method is not async because it operates on pre-fetched records - 
no I/O is needed.
-    ///
-    /// TODO: Consider making this async and fetching metadata table records 
within this method
-    /// instead of receiving pre-fetched records. This would make the API more 
symmetric with
-    /// `load_file_groups_from_file_system` and encapsulate the metadata table 
fetching logic.
+    /// This method is not async because it operates on pre-fetched records.
     ///
     /// # Arguments
-    /// * `metadata_table_records` - Metadata table files partition records
+    /// * `records` - Metadata table files partition records
     /// * `partition_pruner` - Filters which partitions to include
-    /// * `completion_time_view` - View to look up completion timestamps.
-    ///   - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for 
all lookups)
-    ///   - For v8+ tables: Pass [`CompletionTimeView`] built from timeline 
instants
-    ///
-    /// [`V6CompletionTimeView`]: 
crate::timeline::completion_time::V6CompletionTimeView
-    /// [`CompletionTimeView`]: 
crate::timeline::completion_time::CompletionTimeView
-    fn load_file_groups_from_metadata_table<V: TimelineViewByCompletionTime>(
+    /// * `completion_time_view` - View to look up completion timestamps
+    fn load_file_groups_from_metadata_table_records<V: CompletionTimeView>(
         &self,
-        metadata_table_records: &HashMap<String, FilesPartitionRecord>,
+        records: &HashMap<String, 
crate::metadata::table_record::FilesPartitionRecord>,
         partition_pruner: &PartitionPruner,
         completion_time_view: &V,
     ) -> Result<()> {
         let base_file_format: String = 
self.hudi_configs.get_or_default(BaseFileFormat).into();
         let file_groups_map = file_groups_from_files_partition_records(
-            metadata_table_records,
+            records,
             &base_file_format,
             completion_time_view,
         )?;
@@ -132,15 +118,15 @@ impl FileSystemView {
         Ok(())
     }
 
-    /// Collect file slices from loaded file groups as of a given timestamp.
-    ///
-    /// This is a private method that assumes file groups have already been 
loaded.
-    async fn collect_file_slices_as_of(
+    /// Collect file slices from loaded file groups using the timeline view.
+    async fn collect_file_slices(
         &self,
-        timestamp: &str,
         partition_pruner: &PartitionPruner,
-        excluding_file_groups: &HashSet<FileGroup>,
+        timeline_view: &TimelineView,
     ) -> Result<Vec<FileSlice>> {
+        let timestamp = timeline_view.as_of_timestamp();
+        let excluding_file_groups = timeline_view.excluding_file_groups();
+
         let mut file_slices = Vec::new();
         for mut partition_entry in self.partition_to_file_groups.iter_mut() {
             if !partition_pruner.should_include(partition_entry.key()) {
@@ -160,44 +146,60 @@ impl FileSystemView {
         Ok(file_slices)
     }
 
-    /// Get file slices as of a given timestamp.
+    /// Get file slices using a [`TimelineView`].
+    ///
+    /// This is the main API for retrieving file slices for snapshot or 
time-travel queries.
+    /// It loads file groups from metadata table (if enabled) or storage 
listing,
+    /// then select file slices based on the timeline view.
     ///
-    /// This is the single entrypoint for retrieving file slices. It 
internally decides
-    /// whether to load file groups from metadata table records (if provided) 
or from storage listing.
+    /// The [`TimelineView`] encapsulates:
+    /// - The "as of" timestamp for the query
+    /// - File groups to exclude (from replace commits for example)
+    /// - Completion time mappings (if needed)
     ///
     /// # Arguments
-    /// * `timestamp` - The timestamp to get file slices as of
     /// * `partition_pruner` - Filters which partitions to include
-    /// * `excluding_file_groups` - File groups to exclude (e.g., replaced 
file groups)
-    /// * `metadata_table_records` - Optional metadata table files partition 
records for
-    ///   accelerated listing. If provided, file groups are loaded from these 
records instead
-    ///   of storage listing.
-    /// * `completion_time_view` - View to look up completion timestamps.
-    ///   - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for 
all lookups)
-    ///   - For v8+ tables: Pass [`CompletionTimeView`] built from timeline 
instants
-    ///
-    /// [`V6CompletionTimeView`]: 
crate::timeline::completion_time::V6CompletionTimeView
-    /// [`CompletionTimeView`]: 
crate::timeline::completion_time::CompletionTimeView
-    pub async fn get_file_slices_as_of<V: TimelineViewByCompletionTime + Sync>(
+    /// * `timeline_view` - The timeline view containing query context
+    /// * `metadata_table` - Optional metadata table instance
+    pub(crate) async fn get_file_slices(
         &self,
-        timestamp: &str,
         partition_pruner: &PartitionPruner,
-        excluding_file_groups: &HashSet<FileGroup>,
-        metadata_table_records: Option<&HashMap<String, FilesPartitionRecord>>,
-        completion_time_view: &V,
+        timeline_view: &TimelineView,
+        metadata_table: Option<&Table>,
     ) -> Result<Vec<FileSlice>> {
-        // Load file groups from metadata table records if provided, otherwise 
from file system
-        if let Some(records) = metadata_table_records {
-            self.load_file_groups_from_metadata_table(
-                records,
+        if let Some(mdt) = metadata_table {
+            // Use metadata table for file listing
+            let records = 
mdt.fetch_files_partition_records(partition_pruner).await?;
+            self.load_file_groups_from_metadata_table_records(
+                &records,
                 partition_pruner,
-                completion_time_view,
+                timeline_view,
             )?;
         } else {
-            self.load_file_groups_from_file_system(partition_pruner, 
completion_time_view)
+            // Fall back to storage listing
+            self.load_file_groups_from_storage(partition_pruner, timeline_view)
                 .await?;
         }
-        self.collect_file_slices_as_of(timestamp, partition_pruner, 
excluding_file_groups)
+        self.collect_file_slices(partition_pruner, timeline_view)
+            .await
+    }
+
+    /// Get file slices using storage listing only.
+    ///
+    /// This method always lists files from storage, which is needed
+    /// for metadata table's own file listing flow to avoid async recursion.
+    ///
+    /// # Arguments
+    /// * `partition_pruner` - Filters which partitions to include
+    /// * `timeline_view` - The timeline view containing query context
+    pub(crate) async fn get_file_slices_by_storage_listing(
+        &self,
+        partition_pruner: &PartitionPruner,
+        timeline_view: &TimelineView,
+    ) -> Result<Vec<FileSlice>> {
+        self.load_file_groups_from_storage(partition_pruner, timeline_view)
+            .await?;
+        self.collect_file_slices(partition_pruner, timeline_view)
             .await
     }
 }
@@ -209,35 +211,28 @@ mod tests {
     use crate::table::Table;
 
     use hudi_test::SampleTable;
-    use std::collections::HashSet;
 
     #[tokio::test]
     async fn fs_view_get_latest_file_slices() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let latest_timestamp = hudi_table
-            .timeline
-            .completed_commits
-            .iter()
-            .next_back()
-            .map(|i| i.timestamp.clone())
-            .unwrap();
+        let latest_timestamp = 
hudi_table.timeline.get_latest_commit_timestamp().unwrap();
         let fs_view = &hudi_table.file_system_view;
-        let completion_time_view = 
hudi_table.timeline.create_completion_time_view();
 
         assert!(fs_view.partition_to_file_groups.is_empty());
+
+        let timeline_view = hudi_table
+            .timeline
+            .create_view_as_of(&latest_timestamp)
+            .await
+            .unwrap();
         let partition_pruner = PartitionPruner::empty();
-        let excludes = HashSet::new();
+
         let file_slices = fs_view
-            .get_file_slices_as_of(
-                &latest_timestamp,
-                &partition_pruner,
-                &excludes,
-                None, // mdt_records
-                &completion_time_view,
-            )
+            .get_file_slices(&partition_pruner, &timeline_view, None)
             .await
             .unwrap();
+
         assert_eq!(fs_view.partition_to_file_groups.len(), 1);
         assert_eq!(file_slices.len(), 1);
         let file_ids = file_slices
@@ -254,33 +249,23 @@ mod tests {
     async fn fs_view_get_latest_file_slices_with_replace_commit() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let latest_timestamp = hudi_table
-            .timeline
-            .completed_commits
-            .iter()
-            .next_back()
-            .map(|i| i.timestamp.clone())
-            .unwrap();
+        let latest_timestamp = 
hudi_table.timeline.get_latest_commit_timestamp().unwrap();
         let fs_view = &hudi_table.file_system_view;
-        let completion_time_view = 
hudi_table.timeline.create_completion_time_view();
 
         assert_eq!(fs_view.partition_to_file_groups.len(), 0);
-        let partition_pruner = PartitionPruner::empty();
-        let excludes = &hudi_table
+
+        let timeline_view = hudi_table
             .timeline
-            .get_replaced_file_groups_as_of(&latest_timestamp)
+            .create_view_as_of(&latest_timestamp)
             .await
             .unwrap();
+        let partition_pruner = PartitionPruner::empty();
+
         let file_slices = fs_view
-            .get_file_slices_as_of(
-                &latest_timestamp,
-                &partition_pruner,
-                excludes,
-                None, // mdt_records
-                &completion_time_view,
-            )
+            .get_file_slices(&partition_pruner, &timeline_view, None)
             .await
             .unwrap();
+
         assert_eq!(fs_view.partition_to_file_groups.len(), 3);
         assert_eq!(file_slices.len(), 1);
         let file_ids = file_slices
@@ -297,21 +282,14 @@ mod tests {
     async fn fs_view_get_latest_file_slices_with_partition_filters() {
         let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let latest_timestamp = hudi_table
-            .timeline
-            .completed_commits
-            .iter()
-            .next_back()
-            .map(|i| i.timestamp.clone())
-            .unwrap();
+        let latest_timestamp = 
hudi_table.timeline.get_latest_commit_timestamp().unwrap();
         let fs_view = &hudi_table.file_system_view;
-        let completion_time_view = 
hudi_table.timeline.create_completion_time_view();
 
         assert_eq!(fs_view.partition_to_file_groups.len(), 0);
 
-        let excludes = &hudi_table
+        let timeline_view = hudi_table
             .timeline
-            .get_replaced_file_groups_as_of(&latest_timestamp)
+            .create_view_as_of(&latest_timestamp)
             .await
             .unwrap();
         let partition_schema = 
hudi_table.get_partition_schema().await.unwrap();
@@ -326,15 +304,10 @@ mod tests {
         .unwrap();
 
         let file_slices = fs_view
-            .get_file_slices_as_of(
-                &latest_timestamp,
-                &partition_pruner,
-                excludes,
-                None, // mdt_records
-                &completion_time_view,
-            )
+            .get_file_slices(&partition_pruner, &timeline_view, None)
             .await
             .unwrap();
+
         assert_eq!(fs_view.partition_to_file_groups.len(), 1);
         assert_eq!(file_slices.len(), 1);
 
diff --git a/crates/core/src/table/listing.rs b/crates/core/src/table/listing.rs
index c7b3404..ca9b25e 100644
--- a/crates/core/src/table/listing.rs
+++ b/crates/core/src/table/listing.rs
@@ -28,7 +28,7 @@ use crate::storage::{get_leaf_dirs, Storage};
 use crate::table::partition::{
     is_table_partitioned, PartitionPruner, EMPTY_PARTITION_PATH, 
PARTITION_METAFIELD_PREFIX,
 };
-use crate::timeline::completion_time::TimelineViewByCompletionTime;
+use crate::timeline::completion_time::CompletionTimeView;
 use crate::Result;
 use dashmap::DashMap;
 use futures::{stream, StreamExt, TryStreamExt};
@@ -65,15 +65,10 @@ impl FileLister {
     /// # Arguments
     /// * `partition_path` - The partition path to list files from
     /// * `completion_time_view` - View to look up completion timestamps.
-    ///   - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for 
all lookups)
-    ///   - For v8+ tables: Pass [`CompletionTimeView`] built from timeline 
instants
     ///
     /// Files whose commit timestamps are not found in the completion time view
     /// (i.e., uncommitted files) will have `completion_timestamp = None`.
-    ///
-    /// [`V6CompletionTimeView`]: 
crate::timeline::completion_time::V6CompletionTimeView
-    /// [`CompletionTimeView`]: 
crate::timeline::completion_time::CompletionTimeView
-    async fn list_file_groups_for_partition<V: TimelineViewByCompletionTime>(
+    async fn list_file_groups_for_partition<V: CompletionTimeView>(
         &self,
         partition_path: &str,
         completion_time_view: &V,
@@ -101,7 +96,7 @@ impl FileLister {
                 if completion_time_view.should_filter_uncommitted()
                     && base_file.completion_timestamp.is_none()
                 {
-                    // v8+ table: file is from an uncommitted commit, skip it
+                    // file belongs to an uncommitted commit, skip it
                     continue;
                 }
 
@@ -118,7 +113,7 @@ impl FileLister {
                         if completion_time_view.should_filter_uncommitted()
                             && log_file.completion_timestamp.is_none()
                         {
-                            // v8+ table: file is from an uncommitted commit, 
skip it
+                            // file belongs to an uncommitted commit, skip it
                             continue;
                         }
 
@@ -187,14 +182,7 @@ impl FileLister {
     ///
     /// # Arguments
     /// * `completion_time_view` - View to look up completion timestamps.
-    ///   - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for 
all lookups)
-    ///   - For v8+ tables: Pass [`CompletionTimeView`] built from timeline 
instants
-    ///
-    /// [`V6CompletionTimeView`]: 
crate::timeline::completion_time::V6CompletionTimeView
-    /// [`CompletionTimeView`]: 
crate::timeline::completion_time::CompletionTimeView
-    pub async fn list_file_groups_for_relevant_partitions<
-        V: TimelineViewByCompletionTime + Sync,
-    >(
+    pub async fn list_file_groups_for_relevant_partitions<V: 
CompletionTimeView + Sync>(
         &self,
         completion_time_view: &V,
     ) -> Result<DashMap<String, Vec<FileGroup>>> {
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 4ae088c..efa2110 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -94,19 +94,14 @@ pub mod partition;
 mod validation;
 
 use crate::config::read::HudiReadConfig;
-use crate::config::table::HudiTableConfig::{
-    MetadataTableEnabled, MetadataTablePartitions, PartitionFields, 
TableVersion,
-};
+use crate::config::table::HudiTableConfig::PartitionFields;
 use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::HudiConfigs;
-use crate::error::CoreError;
 use crate::expr::filter::{from_str_tuples, Filter};
 use crate::file_group::file_slice::FileSlice;
 use crate::file_group::reader::FileGroupReader;
-use crate::metadata::table_record::FilesPartitionRecord;
 use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
 use crate::schema::resolver::{resolve_avro_schema, resolve_schema};
-use crate::storage::util::join_url_segments;
 use crate::table::builder::TableBuilder;
 use crate::table::fs_view::FileSystemView;
 use crate::table::partition::PartitionPruner;
@@ -219,108 +214,6 @@ impl Table {
         self.table_type() == TableTypeValue::MergeOnRead.as_ref()
     }
 
-    /// Check if this table is a metadata table.
-    ///
-    /// Detection is based on the base path ending with `.hoodie/metadata`.
-    pub fn is_metadata_table(&self) -> bool {
-        let base_path: String = self
-            .hudi_configs
-            .get_or_default(HudiTableConfig::BasePath)
-            .into();
-        crate::util::path::is_metadata_table_path(&base_path)
-    }
-
-    /// Get the list of available metadata table partitions for this table.
-    ///
-    /// Returns the partitions configured in 
`hoodie.table.metadata.partitions`.
-    pub fn get_metadata_table_partitions(&self) -> Vec<String> {
-        self.hudi_configs
-            .get_or_default(MetadataTablePartitions)
-            .into()
-    }
-
-    /// Check if the metadata table "files" partition is enabled for file 
listing.
-    ///
-    /// Returns `true` if:
-    /// 1. Table version is >= 8 (MDT support is only for v8+ tables), AND
-    /// 2. Either:
-    ///    - `hoodie.metadata.enable` is explicitly true, OR
-    ///    - "files" is in the configured `hoodie.table.metadata.partitions`
-    ///      (implicit enablement for v8+ tables with configured partitions)
-    ///
-    /// This matches Hudi Java behavior where metadata table is considered 
active
-    /// when partitions are configured, even without explicit 
`hoodie.metadata.enable=true`.
-    pub fn is_metadata_table_enabled(&self) -> bool {
-        // Check table version first - MDT is only supported for v8+ tables
-        // TODO: drop v6 support then no need to check table version here
-        let table_version: isize = self
-            .hudi_configs
-            .get(TableVersion)
-            .map(|v| v.into())
-            .unwrap_or(0);
-
-        if table_version < 8 {
-            return false;
-        }
-
-        // Check if "files" partition is configured
-        let has_files_partition = self
-            .get_metadata_table_partitions()
-            .contains(&FilesPartitionRecord::PARTITION_NAME.to_string());
-
-        // Explicit check for hoodie.metadata.enable
-        let metadata_explicitly_enabled: bool = self
-            .hudi_configs
-            .get_or_default(MetadataTableEnabled)
-            .into();
-
-        // Enable if explicitly enabled OR if files partition is configured.
-        // Note: For v8+ tables, having files partition configured implicitly 
enables MDT
-        // even if hoodie.metadata.enable is not set or is false. This is 
because if the
-        // files partition exists, MDT must have been enabled to populate it 
(either by
-        // Hudi writer or during table migration). This handles tables where 
the explicit
-        // config was not persisted but MDT was actively used.
-        metadata_explicitly_enabled || has_files_partition
-    }
-
-    /// Create a metadata table instance for this data table.
-    ///
-    /// Uses all partitions from `hoodie.table.metadata.partitions` 
configuration.
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if the metadata table cannot be created or if there 
are
-    /// no metadata table partitions configured.
-    pub async fn new_metadata_table(&self) -> Result<Table> {
-        if self.is_metadata_table() {
-            return Err(CoreError::MetadataTable(
-                "Cannot create metadata table from another metadata 
table".to_string(),
-            ));
-        }
-
-        let mdt_partitions = self.get_metadata_table_partitions();
-        if mdt_partitions.is_empty() {
-            return Err(CoreError::MetadataTable(
-                "No metadata table partitions configured".to_string(),
-            ));
-        }
-
-        let mdt_url = join_url_segments(&self.base_url(), &[".hoodie", 
"metadata"])?;
-        Table::new_with_options(
-            mdt_url.as_str(),
-            [(PartitionFields.as_ref(), METADATA_TABLE_PARTITION_FIELD)],
-        )
-        .await
-    }
-
-    /// Same as [Table::new_metadata_table], but blocking.
-    pub fn new_metadata_table_blocking(&self) -> Result<Table> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.new_metadata_table().await })
-    }
-
     pub fn timezone(&self) -> String {
         self.hudi_configs
             .get_or_default(HudiTableConfig::TimelineTimezone)
@@ -370,7 +263,7 @@ impl Table {
     /// Get the latest partition [arrow_schema::Schema] of the table.
     ///
     /// For metadata tables, returns a schema with a single `partition` field
-    /// typed as [arrow_schema::DataType::Utf8], since MDT uses a single 
partition
+    /// typed as [arrow_schema::DataType::Utf8], since metadata tables use a 
single partition
     /// column to identify partitions like "files", "column_stats", etc.
     ///
     /// For regular tables, returns the partition fields with their actual 
data types
@@ -579,35 +472,20 @@ impl Table {
         timestamp: &str,
         filters: &[Filter],
     ) -> Result<Vec<FileSlice>> {
-        // Create completion time view for setting completion timestamps on 
files.
-        // For v8+ tables, this populates completion timestamps and enables 
filtering.
-        // For v6 tables, the view is empty and acts as a no-op.
-        let completion_time_view = self.timeline.create_completion_time_view();
+        let timeline_view = self.timeline.create_view_as_of(timestamp).await?;
 
-        // File slices are keyed by commit_timestamp (request/base instant 
time).
-        let excludes = self
-            .timeline
-            .get_replaced_file_groups_as_of(timestamp)
-            .await?;
         let partition_schema = self.get_partition_schema().await?;
         let partition_pruner =
             PartitionPruner::new(filters, &partition_schema, 
self.hudi_configs.as_ref())?;
 
-        // Try metadata table accelerated listing if files partition is 
configured
-        let metadata_table_records = if self.is_metadata_table_enabled() {
+        // Try to create metadata table instance if enabled
+        let metadata_table = if self.is_metadata_table_enabled() {
             log::debug!("Using metadata table for file listing");
-            match self.fetch_metadata_table_records().await {
-                Ok(records) => {
-                    log::debug!(
-                        "Successfully read {} partition records from metadata 
table",
-                        records.len()
-                    );
-                    Some(records)
-                }
+            match self.new_metadata_table().await {
+                Ok(mdt) => Some(mdt),
                 Err(e) => {
-                    // Fall through to storage listing if metadata table read 
fails
                     log::warn!(
-                        "Failed to read file slices from metadata table, 
falling back to storage listing: {}",
+                        "Failed to create metadata table, falling back to 
storage listing: {}",
                         e
                     );
                     None
@@ -617,28 +495,11 @@ impl Table {
             None
         };
 
-        // Use the single entrypoint that handles both metadata table and 
storage listing
         self.file_system_view
-            .get_file_slices_as_of(
-                timestamp,
-                &partition_pruner,
-                &excludes,
-                metadata_table_records.as_ref(),
-                &completion_time_view,
-            )
+            .get_file_slices(&partition_pruner, &timeline_view, 
metadata_table.as_ref())
             .await
     }
 
-    /// Fetch file records from the metadata table.
-    ///
-    /// The metadata table returns records as-of its current state. For time 
travel
-    /// or incremental queries, the timestamp filtering is handled by the 
caller
-    /// using completion time views - the metadata table just provides the 
file listing.
-    async fn fetch_metadata_table_records(&self) -> Result<HashMap<String, 
FilesPartitionRecord>> {
-        let metadata_table = self.new_metadata_table().await?;
-        metadata_table.read_metadata_files().await
-    }
-
     /// Get all the changed [FileSlice]s in the table between the given 
timestamps.
     ///
     /// # Arguments
@@ -782,78 +643,6 @@ impl Table {
         )
     }
 
-    /// Read records from the "files" partition of the metadata table.
-    ///
-    /// This method can only be called on metadata tables. It reads all records
-    /// from the "files" partition and returns merged `FilesPartitionRecord`s.
-    ///
-    /// # Returns
-    /// A HashMap mapping record keys to their `FilesPartitionRecord`s.
-    pub async fn read_metadata_files(&self) -> Result<HashMap<String, 
FilesPartitionRecord>> {
-        if !self.is_metadata_table() {
-            return Err(CoreError::MetadataTable(
-                "read_metadata_files can only be called on metadata 
tables".to_string(),
-            ));
-        }
-
-        // Use completion timestamp for file slice queries (v8+ tables key by 
completion time)
-        let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() else {
-            return Ok(HashMap::new());
-        };
-
-        // For MDT, always use storage-based listing (not MDT-accelerated)
-        // to avoid recursion. MDT never uses itself for file listing.
-        let excludes = self
-            .timeline
-            .get_replaced_file_groups_as_of(timestamp)
-            .await?;
-        let filters = from_str_tuples([(
-            METADATA_TABLE_PARTITION_FIELD,
-            "=",
-            FilesPartitionRecord::PARTITION_NAME,
-        )])?;
-        let partition_schema = self.get_partition_schema().await?;
-        let partition_pruner =
-            PartitionPruner::new(&filters, &partition_schema, 
self.hudi_configs.as_ref())?;
-        let completion_time_view = self.timeline.create_completion_time_view();
-        let file_slices = self
-            .file_system_view
-            .get_file_slices_as_of(
-                timestamp,
-                &partition_pruner,
-                &excludes,
-                None, // Never use MDT for reading MDT itself (avoid recursion)
-                &completion_time_view,
-            )
-            .await?;
-
-        if file_slices.len() != 1 {
-            return Err(CoreError::MetadataTable(format!(
-                "Expected 1 file slice for {} partition, got {}",
-                FilesPartitionRecord::PARTITION_NAME,
-                file_slices.len()
-            )));
-        }
-        let file_slice = &file_slices[0];
-
-        let fg_reader = self.create_file_group_reader_with_options([(
-            HudiReadConfig::FileGroupEndTimestamp,
-            timestamp,
-        )])?;
-
-        fg_reader
-            .read_file_slice_from_metadata_table(file_slice)
-            .await
-    }
-
-    /// Same as [Table::read_metadata_files], but blocking.
-    pub fn read_metadata_files_blocking(&self) -> Result<HashMap<String, 
FilesPartitionRecord>> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(self.read_metadata_files())
-    }
-
     /// Get all the latest records in the table.
     ///
     /// # Arguments
@@ -949,23 +738,24 @@ impl Table {
     ) -> Result<Vec<RecordBatch>> {
         // If the end timestamp is not provided, use the latest file slice 
timestamp.
         // This is the request timestamp (commit_timestamp) for both v6 and 
v8+ tables.
-        let Some(end_timestamp) =
+        let Some(end_ts) =
             end_timestamp.or_else(|| 
self.timeline.get_latest_commit_timestamp_as_option())
         else {
             return Ok(Vec::new());
         };
 
         let timezone = self.timezone();
-        let start_timestamp = format_timestamp(start_timestamp, &timezone)?;
-        let end_timestamp = format_timestamp(end_timestamp, &timezone)?;
+        let start_ts = format_timestamp(start_timestamp, &timezone)?;
+        let end_ts = format_timestamp(end_ts, &timezone)?;
 
+        // Use incremental API that reads from timeline commit metadata
         let file_slices = self
-            .get_file_slices_between_internal(&start_timestamp, &end_timestamp)
+            .get_file_slices_between_internal(&start_ts, &end_ts)
             .await?;
 
         let fg_reader = self.create_file_group_reader_with_options([
-            (HudiReadConfig::FileGroupStartTimestamp, start_timestamp),
-            (HudiReadConfig::FileGroupEndTimestamp, end_timestamp),
+            (HudiReadConfig::FileGroupStartTimestamp, start_ts.clone()),
+            (HudiReadConfig::FileGroupEndTimestamp, end_ts),
         ])?;
 
         let batches =
@@ -1016,9 +806,9 @@ mod tests {
     use crate::config::HUDI_CONF_DIR;
     use crate::error::CoreError;
     use crate::metadata::meta_field::MetaField;
-    use crate::metadata::table_record::MetadataRecordType;
     use crate::storage::util::join_url_segments;
     use crate::storage::Storage;
+    use crate::timeline::EARLIEST_START_TIMESTAMP;
     use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq, 
SampleTable};
     use std::collections::HashSet;
     use std::fs::canonicalize;
@@ -1689,190 +1479,4 @@ mod tests {
         let expected = HashSet::new();
         assert_eq!(actual, expected);
     }
-
-    // 
=========================================================================
-    // Metadata Table Tests
-    // 
=========================================================================
-
-    fn get_data_table() -> Table {
-        use hudi_test::QuickstartTripsTable;
-        let table_path = 
QuickstartTripsTable::V8Trips8I3U1D.path_to_mor_avro();
-        Table::new_blocking(&table_path).unwrap()
-    }
-
-    #[test]
-    fn hudi_table_read_metadata_files() {
-        let data_table = get_data_table();
-        let metadata_table = data_table.new_metadata_table_blocking().unwrap();
-
-        assert!(metadata_table.is_metadata_table());
-
-        let records = metadata_table.read_metadata_files_blocking().unwrap();
-
-        // Should have 4 records: __all_partitions__ + 3 city partitions
-        assert_eq!(records.len(), 4);
-
-        // Validate __all_partitions__ record
-        let all_partitions = records.get("__all_partitions__").unwrap();
-        assert_eq!(
-            all_partitions.record_type,
-            MetadataRecordType::AllPartitions
-        );
-        let partition_names: HashSet<_> = 
all_partitions.partition_names().into_iter().collect();
-        assert_eq!(
-            partition_names,
-            HashSet::from(["city=chennai", "city=san_francisco", 
"city=sao_paulo"])
-        );
-
-        // Validate city=chennai record with actual file names
-        let chennai = records.get("city=chennai").unwrap();
-        assert_eq!(chennai.record_type, MetadataRecordType::Files);
-        let chennai_files: HashSet<_> = 
chennai.active_file_names().into_iter().collect();
-        assert_eq!(
-            chennai_files,
-            HashSet::from([
-                
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_2-986-2794_20251220210108078.parquet",
-                
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_0-1112-3190_20251220210129235.parquet",
-                
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210127080.log.1_0-1072-3078",
-                
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210128625.log.1_0-1097-3150",
-            ])
-        );
-        assert!(chennai.total_size() > 0);
-    }
-
-    #[test]
-    fn hudi_table_get_metadata_table_partitions() {
-        let data_table = get_data_table();
-
-        // Verify we can get the MDT partitions from the data table
-        let partitions = data_table.get_metadata_table_partitions();
-
-        // The test table has 5 MDT partitions configured
-        assert_eq!(
-            partitions.len(),
-            5,
-            "Should have 5 MDT partitions, got: {:?}",
-            partitions
-        );
-
-        // Verify all expected partitions are present
-        let expected = [
-            "column_stats",
-            "files",
-            "partition_stats",
-            "record_index",
-            "secondary_index_rider_idx",
-        ];
-        for partition in &expected {
-            assert!(
-                partitions.contains(&partition.to_string()),
-                "Should contain '{}' partition, got: {:?}",
-                partition,
-                partitions
-            );
-        }
-    }
-
-    #[test]
-    fn hudi_table_is_metadata_table_enabled() {
-        // V8 table with files partition configured should enable MDT
-        // even without explicit hoodie.metadata.enable=true
-        let data_table = get_data_table();
-
-        // Verify it's a v8 table
-        let table_version: isize = data_table
-            .hudi_configs
-            .get(TableVersion)
-            .map(|v| v.into())
-            .unwrap_or(0);
-        assert_eq!(table_version, 8, "Test table should be v8");
-
-        // Verify files partition is configured
-        let partitions = data_table.get_metadata_table_partitions();
-        assert!(
-            partitions.contains(&"files".to_string()),
-            "Should have 'files' partition configured"
-        );
-
-        // Verify is_metadata_table_enabled returns true (implicit enablement)
-        assert!(
-            data_table.is_metadata_table_enabled(),
-            "is_metadata_table_enabled should return true for v8 table with 
files partition"
-        );
-    }
-
-    #[test]
-    fn hudi_table_v6_metadata_table_not_enabled() {
-        // V6 tables should NOT have MDT enabled, even with explicit setting
-        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
-
-        // Verify it's a v6 table
-        let table_version: isize = hudi_table
-            .hudi_configs
-            .get(TableVersion)
-            .map(|v| v.into())
-            .unwrap_or(0);
-        assert_eq!(table_version, 6, "Test table should be v6");
-
-        // V6 tables should not have MDT enabled
-        assert!(
-            !hudi_table.is_metadata_table_enabled(),
-            "is_metadata_table_enabled should return false for v6 table"
-        );
-    }
-
-    #[test]
-    fn hudi_table_is_not_metadata_table() {
-        // A regular data table should not be a metadata table
-        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
-        assert!(
-            !hudi_table.is_metadata_table(),
-            "Regular data table should not be a metadata table"
-        );
-    }
-
-    #[test]
-    fn hudi_table_metadata_table_is_metadata_table() {
-        // Create a metadata table and verify it's recognized as such
-        let data_table = get_data_table();
-        let metadata_table = data_table.new_metadata_table_blocking().unwrap();
-        assert!(
-            metadata_table.is_metadata_table(),
-            "Metadata table should be recognized as a metadata table"
-        );
-    }
-
-    #[test]
-    fn hudi_table_new_metadata_table_from_metadata_table_errors() {
-        // Trying to create a metadata table from a metadata table should fail
-        let data_table = get_data_table();
-        let metadata_table = data_table.new_metadata_table_blocking().unwrap();
-
-        let result = metadata_table.new_metadata_table_blocking();
-        assert!(result.is_err());
-        let err = result.unwrap_err();
-        assert!(
-            err.to_string()
-                .contains("Cannot create metadata table from another metadata 
table"),
-            "Error message should indicate cannot create from metadata table"
-        );
-    }
-
-    #[tokio::test]
-    async fn hudi_table_read_metadata_files_on_non_metadata_table_errors() {
-        // Calling read_metadata_files on a non-metadata table should fail
-        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
-
-        let result = hudi_table.read_metadata_files().await;
-        assert!(result.is_err());
-        let err = result.unwrap_err();
-        assert!(
-            err.to_string()
-                .contains("can only be called on metadata tables"),
-            "Error message should indicate read_metadata_files requires 
metadata table"
-        );
-    }
 }
diff --git a/crates/core/src/timeline/completion_time.rs 
b/crates/core/src/timeline/completion_time.rs
index c6cd4dc..f68630c 100644
--- a/crates/core/src/timeline/completion_time.rs
+++ b/crates/core/src/timeline/completion_time.rs
@@ -19,201 +19,39 @@
 
 //! Completion time query view for looking up completion timestamps from 
request timestamps.
 //!
-//! This module provides the [`TimelineViewByCompletionTime`] trait and 
implementations
-//! for mapping request timestamps to completion timestamps. This is essential 
for
-//! v8+ tables where file names contain request timestamps but file ordering 
and
-//! association must be based on completion timestamps.
+//! This module provides the [`CompletionTimeView`] trait for mapping request 
timestamps
+//! to completion timestamps. This is essential for timeline layout v2 where 
file names contain
+//! request timestamps but file ordering and association must be based on 
completion timestamps.
 //!
-//! # Table Version Differences
+//! # Timeline Layout Differences
 //!
-//! - **v6 tables**: Completion timestamps are not tracked. Use 
[`V6CompletionTimeView`]
-//!   which returns `None` for all lookups (completion_timestamp remains 
unset).
+//! - **Timeline layout v1**: Completion timestamps are not tracked. The 
completion time map is empty
+//!   and `get_completion_time()` always returns `None`.
 //!
-//! - **v8+ tables**: Request and completion timestamps are different. The 
completion timestamp
+//! - **Timeline layout v2**: Completion timestamps are tracked in completed 
instants. The completion timestamp
 //!   is stored in the timeline instant filename as 
`{request}_{completion}.{action}`.
-//!   Use [`CompletionTimeView`] to look up completion timestamps from a map.
-
-use crate::timeline::instant::Instant;
-use std::collections::HashMap;
+//!   The completion time map is populated from timeline instants.
+//!
+//! # Implementation
+//!
+//! [`TimelineView`] is the main implementation of this trait. It checks
+//! [`HudiTableConfig::TimelineLayoutVersion`] to determine whether to build 
the completion time map.
+//!
+//! [`TimelineView`]: crate::timeline::view::TimelineView
+//! [`HudiTableConfig::TimelineLayoutVersion`]: 
crate::config::table::HudiTableConfig::TimelineLayoutVersion
 
 /// A view for querying completion timestamps from request timestamps.
 ///
 /// This trait abstracts the completion time lookup logic to support both
-/// v6 tables (where completion time is not tracked) and v8+ tables (where
-/// request and completion timestamps differ).
-pub trait TimelineViewByCompletionTime {
+/// V1 and V2 timeline layouts.
+pub trait CompletionTimeView {
     /// Get the completion timestamp for a given request timestamp.
     ///
     /// Returns `Some(completion_timestamp)` if the request timestamp 
corresponds
     /// to a completed commit, `None` if the commit is pending/unknown or if
-    /// completion time is not tracked (v6 tables).
+    /// completion time is not tracked.
     fn get_completion_time(&self, request_timestamp: &str) -> Option<&str>;
 
     /// Returns true if uncommitted files should be filtered out.
-    ///
-    /// For v8+ tables, this returns true because we track completion times
-    /// and can distinguish committed from uncommitted files.
-    /// For v6 tables, this returns false because completion time is not 
tracked.
-    fn should_filter_uncommitted(&self) -> bool {
-        false
-    }
-}
-
-/// Completion time view for v6 tables.
-///
-/// In v6, completion time is not tracked separately from request time.
-/// This view always returns `None`, meaning the caller should use the
-/// request timestamp directly for any completion-time-based logic.
-#[derive(Debug, Default)]
-pub struct V6CompletionTimeView;
-
-impl V6CompletionTimeView {
-    pub fn new() -> Self {
-        Self
-    }
-}
-
-impl TimelineViewByCompletionTime for V6CompletionTimeView {
-    fn get_completion_time(&self, _request_timestamp: &str) -> Option<&str> {
-        None
-    }
-}
-
-/// Completion time view for v8+ tables backed by a HashMap.
-///
-/// This view is built from timeline instants and maps request timestamps
-/// to their corresponding completion timestamps.
-#[derive(Debug)]
-pub struct CompletionTimeView {
-    /// Map from request timestamp to completion timestamp
-    request_to_completion: HashMap<String, String>,
-}
-
-impl CompletionTimeView {
-    /// Create a new completion time view from timeline instants.
-    ///
-    /// Only completed instants with a completion timestamp are included.
-    pub fn from_instants<'a, I>(instants: I) -> Self
-    where
-        I: IntoIterator<Item = &'a Instant>,
-    {
-        let request_to_completion = instants
-            .into_iter()
-            .filter_map(|instant| {
-                instant
-                    .completion_timestamp
-                    .as_ref()
-                    .map(|completion_ts| (instant.timestamp.clone(), 
completion_ts.clone()))
-            })
-            .collect();
-
-        Self {
-            request_to_completion,
-        }
-    }
-
-    /// Create a new empty completion time view.
-    pub fn empty() -> Self {
-        Self {
-            request_to_completion: HashMap::new(),
-        }
-    }
-
-    /// Check if this view has any completion time mappings.
-    pub fn is_empty(&self) -> bool {
-        self.request_to_completion.is_empty()
-    }
-
-    /// Get the number of completion time mappings.
-    pub fn len(&self) -> usize {
-        self.request_to_completion.len()
-    }
-}
-
-impl TimelineViewByCompletionTime for CompletionTimeView {
-    fn get_completion_time(&self, request_timestamp: &str) -> Option<&str> {
-        self.request_to_completion
-            .get(request_timestamp)
-            .map(|s| s.as_str())
-    }
-
-    fn should_filter_uncommitted(&self) -> bool {
-        // Only filter if we have completion time data (v8+ table)
-        !self.is_empty()
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::timeline::instant::{Action, State};
-
-    fn create_instant(request_ts: &str, completion_ts: Option<&str>) -> 
Instant {
-        Instant {
-            timestamp: request_ts.to_string(),
-            completion_timestamp: completion_ts.map(|s| s.to_string()),
-            action: Action::Commit,
-            state: State::Completed,
-            epoch_millis: 0,
-        }
-    }
-
-    #[test]
-    fn test_v6_completion_time_view() {
-        let view = V6CompletionTimeView::new();
-
-        // V6 view always returns None for completion time
-        // (caller should use request time directly for v6 tables)
-        assert!(view.get_completion_time("20240101120000000").is_none());
-        assert!(view.get_completion_time("any_timestamp").is_none());
-    }
-
-    #[test]
-    fn test_timeline_completion_time_view_from_instants() {
-        let instants = vec![
-            create_instant("20240101120000000", Some("20240101120005000")),
-            create_instant("20240101130000000", Some("20240101130010000")),
-            create_instant("20240101140000000", None), // Pending instant
-        ];
-
-        let view = CompletionTimeView::from_instants(&instants);
-
-        // Completed instants should have completion times
-        assert_eq!(
-            view.get_completion_time("20240101120000000"),
-            Some("20240101120005000")
-        );
-        assert_eq!(
-            view.get_completion_time("20240101130000000"),
-            Some("20240101130010000")
-        );
-
-        // Pending instant should not have completion time
-        assert!(view.get_completion_time("20240101140000000").is_none());
-
-        // Unknown timestamp should return None
-        assert!(view.get_completion_time("unknown").is_none());
-    }
-
-    #[test]
-    fn test_timeline_completion_time_view_empty() {
-        let view = CompletionTimeView::empty();
-
-        assert!(view.is_empty());
-        assert_eq!(view.len(), 0);
-        assert!(view.get_completion_time("any").is_none());
-    }
-
-    #[test]
-    fn test_timeline_completion_time_view_len() {
-        let instants = vec![
-            create_instant("20240101120000000", Some("20240101120005000")),
-            create_instant("20240101130000000", Some("20240101130010000")),
-        ];
-
-        let view = CompletionTimeView::from_instants(&instants);
-
-        assert!(!view.is_empty());
-        assert_eq!(view.len(), 2);
-    }
+    fn should_filter_uncommitted(&self) -> bool;
 }
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 20d8b5a..9f7f8af 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -23,22 +23,21 @@ pub mod loader;
 pub mod lsm_tree;
 pub(crate) mod selector;
 pub(crate) mod util;
+pub mod view;
 
 use crate::config::HudiConfigs;
 use crate::error::CoreError;
-use crate::file_group::builder::{
-    file_groups_from_commit_metadata, 
replaced_file_groups_from_replace_commit, FileGroupMerger,
-};
+use crate::file_group::builder::replaced_file_groups_from_replace_commit;
 use crate::file_group::FileGroup;
 use crate::schema::resolver::{
     resolve_avro_schema_from_commit_metadata, 
resolve_schema_from_commit_metadata,
 };
 use crate::storage::Storage;
 use crate::timeline::builder::TimelineBuilder;
-use crate::timeline::completion_time::CompletionTimeView;
 use crate::timeline::instant::Action;
 use crate::timeline::loader::TimelineLoader;
 use crate::timeline::selector::TimelineSelector;
+use crate::timeline::view::TimelineView;
 use crate::Result;
 use arrow_schema::Schema;
 use instant::Instant;
@@ -249,16 +248,16 @@ impl Timeline {
             .map_or_else(|| Err(CoreError::TimelineNoCommit), |t| 
Ok(t.to_string()))
     }
 
-    /// Create a [CompletionTimeView] from the completed commits.
-    ///
-    /// This view maps request timestamps to completion timestamps, enabling
-    /// correct file association for v8+ tables where request and completion
-    /// timestamps differ.
-    ///
-    /// For v6 tables, the view will be empty since completion_timestamp is 
None
-    /// for all instants, and the caller should use the request timestamp 
directly.
-    pub fn create_completion_time_view(&self) -> CompletionTimeView {
-        CompletionTimeView::from_instants(&self.completed_commits)
+    /// Create a [TimelineView] as of the given timestamp.
+    pub async fn create_view_as_of(&self, timestamp: &str) -> 
Result<TimelineView> {
+        let excludes = self.get_replaced_file_groups_as_of(timestamp).await?;
+        Ok(TimelineView::new(
+            timestamp.to_string(),
+            None,
+            &self.completed_commits,
+            excludes,
+            &self.hudi_configs,
+        ))
     }
 
     /// Get the latest [apache_avro::schema::Schema] as [String] from the 
[Timeline].
@@ -301,21 +300,28 @@ impl Timeline {
         Ok(file_groups)
     }
 
-    /// Get file groups in the timeline ranging from start (exclusive) to end 
(inclusive).
-    /// File groups are as of the [end] timestamp or the latest if not given.
+    /// Get file groups from commit metadata for commits in a time range.
+    ///
+    /// This is used for incremental queries where we only want file groups
+    /// that were modified in the time range (start, end].
     ///
-    /// For v8+ tables, the completion timestamps from the timeline instants 
are used
-    /// to set the completion_timestamp on base files and log files, enabling 
correct
-    /// file slice association based on completion order rather than request 
order.
+    /// # Arguments
+    /// * `start_timestamp` - Start of the time range (exclusive), None means 
no lower bound
+    /// * `end_timestamp` - End of the time range (inclusive), None means no 
upper bound
     ///
-    /// For v6 tables, completion_timestamp is None (v6 does not track 
completion times).
+    /// # Returns
+    /// File groups that were modified in the time range, excluding replaced 
file groups.
     pub(crate) async fn get_file_groups_between(
         &self,
         start_timestamp: Option<&str>,
         end_timestamp: Option<&str>,
     ) -> Result<HashSet<FileGroup>> {
-        let mut file_groups: HashSet<FileGroup> = HashSet::new();
-        let mut replaced_file_groups: HashSet<FileGroup> = HashSet::new();
+        use crate::file_group::builder::{
+            file_groups_from_commit_metadata, 
replaced_file_groups_from_replace_commit,
+            FileGroupMerger,
+        };
+
+        // Get commits in the time range (start, end]
         let selector = TimelineSelector::completed_actions_in_range(
             DEFAULT_LOADING_ACTIONS,
             self.hudi_configs.clone(),
@@ -323,11 +329,21 @@ impl Timeline {
             end_timestamp,
         )?;
         let commits = selector.select(self)?;
+        if commits.is_empty() {
+            return Ok(HashSet::new());
+        }
 
-        // Build completion time view from all commits for v8+ tables.
-        // Each file (base or log) may have a different completion timestamp,
-        // looked up from its own request timestamp.
-        let completion_time_view = CompletionTimeView::from_instants(&commits);
+        // Build completion time view from selected commits only.
+        let completion_time_view = TimelineView::new(
+            commits.last().unwrap().timestamp.clone(),
+            Some(commits.first().unwrap().timestamp.clone()),
+            &commits,
+            HashSet::new(),
+            &self.hudi_configs,
+        );
+
+        let mut file_groups: HashSet<FileGroup> = HashSet::new();
+        let mut replaced_file_groups: HashSet<FileGroup> = HashSet::new();
 
         for commit in commits {
             let commit_metadata = self.get_instant_metadata(&commit).await?;
diff --git a/crates/core/src/timeline/view.rs b/crates/core/src/timeline/view.rs
new file mode 100644
index 0000000..9aa2b45
--- /dev/null
+++ b/crates/core/src/timeline/view.rs
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Timeline view for filtering file slices.
+//!
+//! This module provides the [`TimelineView`] struct which encapsulates all
+//! timeline-derived context needed for file slice queries:
+//! - Query timestamp (as of)
+//! - Start timestamp (for incremental queries)
+//! - Completion time mappings
+//! - File groups to be excluded (e.g., replaced by clustering)
+//!
+//! [`TimelineView`] implements [`CompletionTimeView`] trait and is the main
+//! type used for completion time lookups throughout the codebase.
+
+use crate::config::table::HudiTableConfig::TimelineLayoutVersion;
+use crate::config::HudiConfigs;
+use crate::file_group::FileGroup;
+use crate::timeline::completion_time::CompletionTimeView;
+use crate::timeline::instant::Instant;
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+/// Timeline view for filtering file slices.
+///
+/// See module-level documentation for details.
+#[derive(Debug)]
+pub struct TimelineView {
+    /// The "as of" timestamp for the snapshot query.
+    /// It is also the end timestamp for incremental queries.
+    as_of_timestamp: String,
+
+    /// The start timestamp when the view is used for incremental queries.
+    #[allow(dead_code)]
+    start_timestamp: Option<String>,
+
+    /// File groups to exclude from the query result.
+    ///
+    /// These are file groups that have been replaced by clustering
+    /// or insert overwrite operations before the query timestamp.
+    excluding_file_groups: HashSet<FileGroup>,
+
+    /// Map from request timestamp to completion timestamp.
+    ///
+    /// Populated for timeline layout v2. Empty for v1.
+    request_to_completion: HashMap<String, String>,
+
+    /// Whether this table uses timeline layout v2 (completion time tracking).
+    is_timeline_layout_v2: bool,
+}
+
+impl TimelineView {
+    /// Create a new timeline view.
+    ///
+    /// # Arguments
+    /// * `as_of_timestamp` - The "as of" timestamp for the snapshot and 
time-travel query; also the end timestamp for incremental queries
+    /// * `start_timestamp` - The start timestamp for incremental queries
+    /// * `completed_commits` - Iterator over completed commit instants to 
build the view from
+    /// * `excluding_file_groups` - File groups to exclude (e.g., replaced by 
clustering)
+    /// * `hudi_configs` - The shared Hudi configurations
+    pub fn new<'a, I>(
+        as_of_timestamp: String,
+        start_timestamp: Option<String>,
+        completed_commits: I,
+        excluding_file_groups: HashSet<FileGroup>,
+        hudi_configs: &Arc<HudiConfigs>,
+    ) -> Self
+    where
+        I: IntoIterator<Item = &'a Instant>,
+    {
+        // Only build completion time map for timeline layout v2
+        let timeline_layout_version: isize = hudi_configs
+            .get(TimelineLayoutVersion)
+            .map(|v| v.into())
+            .unwrap_or(0);
+
+        let is_timeline_layout_v2 = timeline_layout_version >= 2;
+        let request_to_completion = if is_timeline_layout_v2 {
+            Self::build_completion_time_map(completed_commits)
+        } else {
+            HashMap::new()
+        };
+
+        Self {
+            as_of_timestamp,
+            start_timestamp,
+            excluding_file_groups,
+            request_to_completion,
+            is_timeline_layout_v2,
+        }
+    }
+
+    /// Build the completion time map from instants.
+    fn build_completion_time_map<'a, I>(instants: I) -> HashMap<String, String>
+    where
+        I: IntoIterator<Item = &'a Instant>,
+    {
+        instants
+            .into_iter()
+            .filter_map(|instant| {
+                instant
+                    .completion_timestamp
+                    .as_ref()
+                    .map(|completion_ts| (instant.timestamp.clone(), 
completion_ts.clone()))
+            })
+            .collect()
+    }
+
+    /// Get the "as of" timestamp for this view.
+    #[inline]
+    pub fn as_of_timestamp(&self) -> &str {
+        &self.as_of_timestamp
+    }
+
+    /// Get the file groups to exclude from the query.
+    #[inline]
+    pub fn excluding_file_groups(&self) -> &HashSet<FileGroup> {
+        &self.excluding_file_groups
+    }
+}
+
+impl CompletionTimeView for TimelineView {
+    fn get_completion_time(&self, request_timestamp: &str) -> Option<&str> {
+        self.request_to_completion
+            .get(request_timestamp)
+            .map(|s| s.as_str())
+    }
+
+    fn should_filter_uncommitted(&self) -> bool {
+        self.is_timeline_layout_v2
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::config::HudiConfigs;
+    use crate::timeline::instant::{Action, State};
+    fn create_instant(request_ts: &str, completion_ts: Option<&str>) -> 
Instant {
+        Instant {
+            timestamp: request_ts.to_string(),
+            completion_timestamp: completion_ts.map(|s| s.to_string()),
+            action: Action::Commit,
+            state: State::Completed,
+            epoch_millis: 0,
+        }
+    }
+
+    fn create_layout_v1_configs() -> Arc<HudiConfigs> {
+        Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "1")]))
+    }
+
+    fn create_layout_v2_configs() -> Arc<HudiConfigs> {
+        Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "2")]))
+    }
+
+    #[test]
+    fn test_snapshot_view_creation_layout_v2() {
+        let instants = vec![
+            create_instant("20240101120000000", Some("20240101120005000")),
+            create_instant("20240101130000000", Some("20240101130010000")),
+        ];
+        let configs = create_layout_v2_configs();
+
+        let view = TimelineView::new(
+            "20240101130000000".to_string(),
+            None,
+            &instants,
+            HashSet::new(),
+            &configs,
+        );
+
+        assert_eq!(view.as_of_timestamp(), "20240101130000000");
+        assert!(view.excluding_file_groups().is_empty());
+        // Layout v2 should have completion time map populated
+        assert!(view.should_filter_uncommitted());
+    }
+
+    #[test]
+    fn test_snapshot_view_creation_layout_v1() {
+        let instants = vec![
+            create_instant("20240101120000000", Some("20240101120005000")),
+            create_instant("20240101130000000", Some("20240101130010000")),
+        ];
+        let configs = create_layout_v1_configs();
+
+        let view = TimelineView::new(
+            "20240101130000000".to_string(),
+            None,
+            &instants,
+            HashSet::new(),
+            &configs,
+        );
+
+        assert_eq!(view.as_of_timestamp(), "20240101130000000");
+        // Layout v1 should NOT have completion time map (empty)
+        assert!(!view.should_filter_uncommitted());
+        assert!(view.get_completion_time("20240101120000000").is_none());
+    }
+
+    #[test]
+    fn test_completion_time_lookup_layout_v2() {
+        let instants = vec![
+            create_instant("20240101120000000", Some("20240101120005000")),
+            create_instant("20240101130000000", Some("20240101130010000")),
+            create_instant("20240101140000000", None), // Pending
+        ];
+        let configs = create_layout_v2_configs();
+
+        let view = TimelineView::new(
+            "20240101140000000".to_string(),
+            None,
+            &instants,
+            HashSet::new(),
+            &configs,
+        );
+
+        // Completed instants have completion time
+        assert_eq!(
+            view.get_completion_time("20240101120000000"),
+            Some("20240101120005000")
+        );
+        assert_eq!(
+            view.get_completion_time("20240101130000000"),
+            Some("20240101130010000")
+        );
+
+        // Pending instant has no completion time
+        assert!(view.get_completion_time("20240101140000000").is_none());
+
+        // Unknown timestamp returns None
+        assert!(view.get_completion_time("unknown").is_none());
+    }
+
+    #[test]
+    fn test_should_filter_uncommitted_layout_v2() {
+        let instants = vec![create_instant(
+            "20240101120000000",
+            Some("20240101120005000"),
+        )];
+        let configs = create_layout_v2_configs();
+
+        let view = TimelineView::new(
+            "20240101120000000".to_string(),
+            None,
+            &instants,
+            HashSet::new(),
+            &configs,
+        );
+
+        // Layout v2 should filter uncommitted
+        assert!(view.should_filter_uncommitted());
+    }
+
+    #[test]
+    fn test_should_not_filter_uncommitted_layout_v1() {
+        // Layout v1 - even with instants that have completion timestamps,
+        // the map is not built, so should_filter_uncommitted returns false
+        let instants = vec![create_instant(
+            "20240101120000000",
+            Some("20240101120005000"),
+        )];
+        let configs = create_layout_v1_configs();
+
+        let view = TimelineView::new(
+            "20240101120000000".to_string(),
+            None,
+            &instants,
+            HashSet::new(),
+            &configs,
+        );
+
+        // Layout v1 does not track completion time
+        assert!(!view.should_filter_uncommitted());
+    }
+
+    #[test]
+    fn test_excluding_file_groups() {
+        let instants: Vec<Instant> = vec![];
+        let configs = create_layout_v2_configs();
+        let mut excludes = HashSet::new();
+        excludes.insert(FileGroup::new("file-id-1".to_string(), 
"p1".to_string()));
+        excludes.insert(FileGroup::new("file-id-2".to_string(), 
"p2".to_string()));
+
+        let view = TimelineView::new(
+            "20240101120000000".to_string(),
+            None,
+            &instants,
+            excludes,
+            &configs,
+        );
+
+        assert_eq!(view.excluding_file_groups().len(), 2);
+    }
+}


Reply via email to