This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 23a3b4a  feat: support reading metadata table (files) (#499)
23a3b4a is described below

commit 23a3b4a8f092439b7d7b5b61de8f7980c18a3e66
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Dec 22 10:38:32 2025 -0600

    feat: support reading metadata table (files) (#499)
---
 crates/core/src/config/table.rs                   |  39 +++
 crates/core/src/file_group/file_slice.rs          |  11 +
 crates/core/src/file_group/reader.rs              | 349 ++++++++++++++++++++++
 crates/core/src/hfile/reader.rs                   | 265 ++++++++++++++++
 crates/core/src/metadata/mod.rs                   |   3 +
 crates/core/src/metadata/table_record.rs          |   3 +
 crates/core/src/storage/mod.rs                    |   2 +
 crates/core/src/table/mod.rs                      | 219 +++++++++++++-
 crates/core/src/table/validation.rs               |  71 ++++-
 crates/core/src/timeline/instant.rs               |  79 ++++-
 crates/core/src/util/mod.rs                       |   1 +
 crates/core/src/{metadata/mod.rs => util/path.rs} |  33 +-
 12 files changed, 1056 insertions(+), 19 deletions(-)

diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index e3fd273..bcaf3b9 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -128,6 +128,20 @@ pub enum HudiTableConfig {
     /// Path for LSM timeline history for layout v2, relative to timeline path 
(default: history)
     /// The full path will be `.hoodie/{TimelinePath}/{TimelineHistoryPath}`
     TimelineHistoryPath,
+
+    /// Enable the internal metadata table which serves table metadata like 
file listings.
+    ///
+    /// When enabled, file listings are read from the metadata table instead 
of storage,
+    /// which can significantly improve performance for tables with many 
partitions.
+    MetadataTableEnabled,
+
+    /// List of metadata table partitions enabled for this table.
+    ///
+    /// This config is read from the data table's hoodie.properties and 
specifies which
+    /// partitions are available in the metadata table (e.g., "files", 
"column_stats").
+    /// When creating a metadata table instance, this value should be passed 
as the
+    /// PartitionFields option.
+    MetadataTablePartitions,
 }
 
 impl AsRef<str> for HudiTableConfig {
@@ -155,6 +169,8 @@ impl AsRef<str> for HudiTableConfig {
             Self::ArchiveLogFolder => "hoodie.archivelog.folder",
             Self::TimelinePath => "hoodie.timeline.path",
             Self::TimelineHistoryPath => "hoodie.timeline.history.path",
+            Self::MetadataTableEnabled => "hoodie.metadata.enable",
+            Self::MetadataTablePartitions => 
"hoodie.table.metadata.partitions",
         }
     }
 }
@@ -175,6 +191,8 @@ impl ConfigParser for HudiTableConfig {
             )),
             Self::DatabaseName => 
Some(HudiConfigValue::String("default".to_string())),
             Self::DropsPartitionFields => 
Some(HudiConfigValue::Boolean(false)),
+            Self::IsHiveStylePartitioning => 
Some(HudiConfigValue::Boolean(false)),
+            Self::IsPartitionPathUrlencoded => 
Some(HudiConfigValue::Boolean(false)),
             Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
             Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
             Self::TimelineTimezone => Some(HudiConfigValue::String(
@@ -183,6 +201,8 @@ impl ConfigParser for HudiTableConfig {
             Self::ArchiveLogFolder => 
Some(HudiConfigValue::String(".hoodie/archived".to_string())),
             Self::TimelinePath => 
Some(HudiConfigValue::String("timeline".to_string())),
             Self::TimelineHistoryPath => 
Some(HudiConfigValue::String("history".to_string())),
+            Self::MetadataTableEnabled => 
Some(HudiConfigValue::Boolean(false)),
+            Self::MetadataTablePartitions => 
Some(HudiConfigValue::List(vec![])),
             _ => None,
         }
     }
@@ -258,6 +278,13 @@ impl ConfigParser for HudiTableConfig {
             Self::ArchiveLogFolder => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::TimelinePath => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::TimelineHistoryPath => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
+            Self::MetadataTableEnabled => get_result
+                .and_then(|v| {
+                    bool::from_str(v).map_err(|e| ParseBool(self.key(), 
v.to_string(), e))
+                })
+                .map(HudiConfigValue::Boolean),
+            Self::MetadataTablePartitions => get_result
+                .map(|v| 
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
         }
     }
 
@@ -323,6 +350,9 @@ impl FromStr for TableTypeValue {
 pub enum BaseFileFormatValue {
     #[strum(serialize = "parquet")]
     Parquet,
+    /// HFile format - only valid for metadata tables.
+    #[strum(serialize = "hfile")]
+    HFile,
 }
 
 impl FromStr for BaseFileFormatValue {
@@ -331,6 +361,7 @@ impl FromStr for BaseFileFormatValue {
     fn from_str(s: &str) -> Result<Self, Self::Err> {
         match s.to_ascii_lowercase().as_str() {
             "parquet" => Ok(Self::Parquet),
+            "hfile" => Ok(Self::HFile),
             "orc" => Err(UnsupportedValue(s.to_string())),
             v => Err(InvalidValue(v.to_string())),
         }
@@ -423,6 +454,14 @@ mod tests {
             BaseFileFormatValue::from_str("PArquet").unwrap(),
             BaseFileFormatValue::Parquet
         );
+        assert_eq!(
+            BaseFileFormatValue::from_str("hfile").unwrap(),
+            BaseFileFormatValue::HFile
+        );
+        assert_eq!(
+            BaseFileFormatValue::from_str("HFILE").unwrap(),
+            BaseFileFormatValue::HFile
+        );
         assert!(matches!(
             BaseFileFormatValue::from_str("").unwrap_err(),
             InvalidValue(_)
diff --git a/crates/core/src/file_group/file_slice.rs 
b/crates/core/src/file_group/file_slice.rs
index 7218188..c81b62e 100644
--- a/crates/core/src/file_group/file_slice.rs
+++ b/crates/core/src/file_group/file_slice.rs
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+use crate::config::table::BaseFileFormatValue;
 use crate::error::CoreError;
 use crate::file_group::base_file::BaseFile;
 use crate::file_group::log_file::LogFile;
@@ -112,12 +113,22 @@ impl FileSlice {
 
     /// Load [FileMetadata] from storage layer for the [BaseFile] if 
`file_metadata` is [None]
     /// or if `file_metadata` is not fully populated.
+    ///
+    /// This only loads metadata for Parquet files. For non-Parquet files 
(e.g., HFile),
+    /// this is a no-op since Parquet-specific metadata reading would fail.
+    /// TODO: see if mdt read would benefit from loading hfile metadata as 
well.
     pub async fn load_metadata_if_needed(&mut self, storage: &Storage) -> 
Result<()> {
+        // Skip non-Parquet files - metadata loading uses Parquet-specific APIs
+        if self.base_file.extension != BaseFileFormatValue::Parquet.as_ref() {
+            return Ok(());
+        }
+
         if let Some(metadata) = &self.base_file.file_metadata {
             if metadata.fully_populated {
                 return Ok(());
             }
         }
+
         let relative_path = self.base_file_relative_path()?;
         let fetched_metadata = 
storage.get_file_metadata(&relative_path).await?;
         self.base_file.file_metadata = Some(fetched_metadata);
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 7f41c0b..1048662 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -26,8 +26,11 @@ use crate::expr::filter::{Filter, SchemableFilter};
 use crate::file_group::file_slice::FileSlice;
 use crate::file_group::log_file::scanner::{LogFileScanner, ScanResult};
 use crate::file_group::record_batches::RecordBatches;
+use crate::hfile::HFileReader;
 use crate::merge::record_merger::RecordMerger;
+use crate::metadata::merger::FilesPartitionMerger;
 use crate::metadata::meta_field::MetaField;
+use crate::metadata::table_record::FilesPartitionRecord;
 use crate::storage::Storage;
 use crate::table::builder::OptionResolver;
 use crate::timeline::selector::InstantRange;
@@ -36,6 +39,7 @@ use arrow::compute::and;
 use arrow::compute::filter_record_batch;
 use arrow_array::{BooleanArray, RecordBatch};
 use futures::TryFutureExt;
+use std::collections::HashMap;
 use std::convert::TryFrom;
 use std::sync::Arc;
 
@@ -320,6 +324,158 @@ impl FileGroupReader {
             .build()?
             .block_on(self.read_file_slice_from_paths(base_file_path, 
log_file_paths))
     }
+
+    // 
=========================================================================
+    // Metadata Table (MDT) File Slice Reading
+    // 
=========================================================================
+
+    /// Check if this reader is configured for a metadata table.
+    ///
+    /// Detection is based on the base path ending with `.hoodie/metadata`.
+    pub fn is_metadata_table(&self) -> bool {
+        let base_path: String = self
+            .hudi_configs
+            .get_or_default(HudiTableConfig::BasePath)
+            .into();
+        crate::util::path::is_metadata_table_path(&base_path)
+    }
+
+    /// Reads a metadata table file slice and returns merged 
FilesPartitionRecords.
+    ///
+    /// This method is specifically designed for reading the metadata table's 
`files` partition,
+    /// which uses HFile format for base files and HFile blocks in log files.
+    ///
+    /// # Arguments
+    /// * `base_file_path` - Relative path to the HFile base file
+    /// * `log_file_paths` - Relative paths to log files
+    ///
+    /// # Returns
+    /// A HashMap mapping record keys (partition paths or 
"__all_partitions__") to
+    /// merged `FilesPartitionRecord`s containing file information.
+    ///
+    /// # Example
+    /// ```ignore
+    /// let reader = FileGroupReader::new_with_options(mdt_base_uri, options)?;
+    /// let merged = reader.read_file_slice_from_mdt_paths(
+    ///     "files/files-0000-0_0-0-0_00000000000000.hfile",
+    ///     vec!["files/.files-0000-0_20240101120000000.log.1_0-100-200"],
+    /// ).await?;
+    ///
+    /// // Get file listing for a partition
+    /// if let Some(record) = merged.get("city=chennai") {
+    ///     for file_name in record.active_file_names() {
+    ///         println!("Active file: {}", file_name);
+    ///     }
+    /// }
+    /// ```
+    pub async fn read_file_slice_from_mdt_paths<I, S>(
+        &self,
+        base_file_path: &str,
+        log_file_paths: I,
+    ) -> Result<HashMap<String, FilesPartitionRecord>>
+    where
+        I: IntoIterator<Item = S>,
+        S: AsRef<str>,
+    {
+        let log_file_paths: Vec<String> = log_file_paths
+            .into_iter()
+            .map(|s| s.as_ref().to_string())
+            .collect();
+
+        // Read HFile base file using async open
+        let mut hfile_reader = HFileReader::open(&self.storage, base_file_path)
+            .await
+            .map_err(|e| {
+                ReadFileSliceError(format!(
+                    "Failed to read MDT base file {}: {:?}",
+                    base_file_path, e
+                ))
+            })?;
+
+        // Get Avro schema from HFile
+        let schema = hfile_reader
+            .get_avro_schema()
+            .map_err(|e| ReadFileSliceError(format!("Failed to get Avro 
schema: {:?}", e)))?
+            .ok_or_else(|| ReadFileSliceError("No Avro schema found in 
HFile".to_string()))?
+            .clone();
+
+        // Collect base records
+        let base_records = hfile_reader
+            .collect_records()
+            .map_err(|e| ReadFileSliceError(format!("Failed to collect HFile 
records: {:?}", e)))?;
+
+        // Scan log files if present
+        let log_records = if log_file_paths.is_empty() {
+            vec![]
+        } else {
+            let instant_range = self.create_instant_range_for_log_file_scan();
+            let scan_result = LogFileScanner::new(self.hudi_configs.clone(), 
self.storage.clone())
+                .scan(log_file_paths, &instant_range)
+                .await?;
+
+            match scan_result {
+                ScanResult::HFileRecords(records) => records,
+                ScanResult::Empty => vec![],
+                ScanResult::RecordBatches(_) => {
+                    return Err(CoreError::LogBlockError(
+                        "Unexpected RecordBatches in metadata table log 
file".to_string(),
+                    ));
+                }
+            }
+        };
+
+        // Merge base and log records
+        let merger = FilesPartitionMerger::new(schema);
+        merger.merge(&base_records, &log_records)
+    }
+
+    /// Same as [FileGroupReader::read_file_slice_from_mdt_paths], but 
blocking.
+    pub fn read_file_slice_from_mdt_paths_blocking<I, S>(
+        &self,
+        base_file_path: &str,
+        log_file_paths: I,
+    ) -> Result<HashMap<String, FilesPartitionRecord>>
+    where
+        I: IntoIterator<Item = S>,
+        S: AsRef<str>,
+    {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(self.read_file_slice_from_mdt_paths(base_file_path, 
log_file_paths))
+    }
+
+    /// Reads a metadata table file slice using a FileSlice object.
+    ///
+    /// Convenience wrapper around 
[FileGroupReader::read_file_slice_from_mdt_paths].
+    pub async fn read_file_slice_from_mdt(
+        &self,
+        file_slice: &FileSlice,
+    ) -> Result<HashMap<String, FilesPartitionRecord>> {
+        let base_file_path = file_slice.base_file_relative_path()?;
+        let log_file_paths = if file_slice.has_log_file() {
+            file_slice
+                .log_files
+                .iter()
+                .map(|log_file| file_slice.log_file_relative_path(log_file))
+                .collect::<Result<Vec<String>>>()?
+        } else {
+            vec![]
+        };
+        self.read_file_slice_from_mdt_paths(&base_file_path, log_file_paths)
+            .await
+    }
+
+    /// Same as [FileGroupReader::read_file_slice_from_mdt], but blocking.
+    pub fn read_file_slice_from_mdt_blocking(
+        &self,
+        file_slice: &FileSlice,
+    ) -> Result<HashMap<String, FilesPartitionRecord>> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(self.read_file_slice_from_mdt(file_slice))
+    }
 }
 
 #[cfg(test)]
@@ -631,4 +787,197 @@ mod tests {
 
         Ok(())
     }
+
+    // 
=========================================================================
+    // MDT File Slice Reading Tests
+    // 
=========================================================================
+
+    fn get_metadata_table_base_uri() -> String {
+        use hudi_test::QuickstartTripsTable;
+        let table_path = 
QuickstartTripsTable::V8Trips8I3U1D.path_to_mor_avro();
+        let mdt_path = 
PathBuf::from(table_path).join(".hoodie").join("metadata");
+        let url = 
Url::from_file_path(canonicalize(&mdt_path).unwrap()).unwrap();
+        url.as_ref().to_string()
+    }
+
+    /// Create a FileGroupReader for MDT without trying to resolve options 
from storage.
+    fn create_mdt_reader() -> Result<FileGroupReader> {
+        let mdt_uri = get_metadata_table_base_uri();
+        let hudi_configs = Arc::new(HudiConfigs::new([(
+            HudiTableConfig::BasePath,
+            mdt_uri.as_str(),
+        )]));
+        
FileGroupReader::new_with_configs_and_overwriting_options(hudi_configs, 
empty_options())
+    }
+
+    #[test]
+    fn test_is_metadata_table_detection() -> Result<()> {
+        // Regular table should return false
+        let base_uri = get_base_uri_with_valid_props();
+        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options())?;
+        assert!(!reader.is_metadata_table());
+
+        // Metadata table should return true
+        let mdt_reader = create_mdt_reader()?;
+        assert!(mdt_reader.is_metadata_table());
+
+        Ok(())
+    }
+
+    /// Initial HFile base file for the files partition (all zeros timestamp).
+    const MDT_FILES_BASE_FILE: &str = 
"files/files-0000-0_0-955-2690_00000000000000000.hfile";
+
+    /// Log files for the V8Trips8I3U1D test table's files partition.
+    const MDT_FILES_LOG_FILES: &[&str] = &[
+        "files/.files-0000-0_20251220210108078.log.1_10-999-2838",
+        "files/.files-0000-0_20251220210123755.log.1_3-1032-2950",
+        "files/.files-0000-0_20251220210125441.log.1_5-1057-3024",
+        "files/.files-0000-0_20251220210127080.log.1_3-1082-3100",
+        "files/.files-0000-0_20251220210128625.log.1_5-1107-3174",
+        "files/.files-0000-0_20251220210129235.log.1_3-1118-3220",
+        "files/.files-0000-0_20251220210130911.log.1_3-1149-3338",
+    ];
+
+    #[test]
+    fn test_read_file_slice_from_mdt_paths_without_log_files() -> Result<()> {
+        use crate::metadata::table_record::MetadataRecordType;
+
+        let reader = create_mdt_reader()?;
+
+        // Read base file only (no log files)
+        let log_files: Vec<&str> = vec![];
+        let merged =
+            
reader.read_file_slice_from_mdt_paths_blocking(MDT_FILES_BASE_FILE, log_files)?;
+
+        // Initial base file only has __all_partitions__ record
+        // City partition records are added through log files
+        assert_eq!(merged.len(), 1, "Base file should have 1 key");
+        assert!(merged.contains_key("__all_partitions__"));
+
+        // Validate __all_partitions__ record
+        let all_parts = merged.get("__all_partitions__").unwrap();
+        assert_eq!(all_parts.record_type, MetadataRecordType::AllPartitions);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_read_file_slice_from_mdt_paths_with_log_files() -> Result<()> {
+        use crate::metadata::table_record::MetadataRecordType;
+
+        let reader = create_mdt_reader()?;
+
+        // Read base file + all log files
+        let merged = reader.read_file_slice_from_mdt_paths_blocking(
+            MDT_FILES_BASE_FILE,
+            MDT_FILES_LOG_FILES.to_vec(),
+        )?;
+
+        // Should still have 4 keys after merging
+        assert_eq!(merged.len(), 4, "Should have 4 partition keys after 
merge");
+
+        // Validate all partition keys have correct record types
+        for (key, record) in &merged {
+            if key == "__all_partitions__" {
+                assert_eq!(record.record_type, 
MetadataRecordType::AllPartitions);
+            } else {
+                assert_eq!(record.record_type, MetadataRecordType::Files);
+            }
+        }
+
+        // Expected UUIDs for each partition's files
+        const CHENNAI_UUID: &str = "6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc";
+        const SAN_FRANCISCO_UUID: &str = 
"036ded81-9ed4-479f-bcea-7145dfa0079b";
+        const SAO_PAULO_UUID: &str = "8aa68f7e-afd6-4c94-b86c-8a886552e08d";
+
+        // Validate chennai partition
+        let chennai = merged.get("city=chennai").unwrap();
+        let active_files = chennai.active_file_names();
+        assert!(
+            active_files.len() >= 2,
+            "Chennai should have at least 2 active files, got {}",
+            active_files.len()
+        );
+        assert!(chennai.total_size() > 0, "Total size should be > 0");
+        for file_name in &active_files {
+            assert!(
+                file_name.contains(CHENNAI_UUID),
+                "Chennai file should contain UUID: {}",
+                file_name
+            );
+        }
+
+        // Validate san_francisco partition
+        let sf = merged.get("city=san_francisco").unwrap();
+        for file_name in sf.active_file_names() {
+            assert!(
+                file_name.contains(SAN_FRANCISCO_UUID),
+                "San Francisco file should contain UUID: {}",
+                file_name
+            );
+        }
+
+        // Validate sao_paulo partition
+        let sp = merged.get("city=sao_paulo").unwrap();
+        for file_name in sp.active_file_names() {
+            assert!(
+                file_name.contains(SAO_PAULO_UUID),
+                "Sao Paulo file should contain UUID: {}",
+                file_name
+            );
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_read_file_slice_from_mdt_error_handling() -> Result<()> {
+        let reader = create_mdt_reader()?;
+
+        // Test with non-existent base file
+        let result = reader
+            
.read_file_slice_from_mdt_paths_blocking("files/nonexistent.hfile", 
Vec::<&str>::new());
+
+        assert!(result.is_err(), "Should error on non-existent file");
+        let err = result.unwrap_err().to_string();
+        assert!(
+            err.contains("Failed to read MDT base file"),
+            "Error should mention MDT base file: {}",
+            err
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_read_file_slice_from_mdt_blocking() -> Result<()> {
+        use crate::file_group::FileGroup;
+
+        let reader = create_mdt_reader()?;
+
+        // Build FileGroup using the API
+        let mut fg = FileGroup::new("files-0000-0".to_string(), 
"files".to_string());
+        let base_file_name = 
MDT_FILES_BASE_FILE.strip_prefix("files/").unwrap();
+        fg.add_base_file_from_name(base_file_name)?;
+        let log_file_names: Vec<_> = MDT_FILES_LOG_FILES
+            .iter()
+            .map(|s| s.strip_prefix("files/").unwrap())
+            .collect();
+        fg.add_log_files_from_names(log_file_names)?;
+
+        let file_slice = fg
+            .get_file_slice_as_of("99999999999999999")
+            .expect("Should have file slice");
+
+        let merged = reader.read_file_slice_from_mdt_blocking(file_slice)?;
+
+        // Should have 4 keys: __all_partitions__ + 3 city partitions
+        assert_eq!(merged.len(), 4);
+        assert!(merged.contains_key("__all_partitions__"));
+        assert!(merged.contains_key("city=chennai"));
+        assert!(merged.contains_key("city=san_francisco"));
+        assert!(merged.contains_key("city=sao_paulo"));
+
+        Ok(())
+    }
 }
diff --git a/crates/core/src/hfile/reader.rs b/crates/core/src/hfile/reader.rs
index ac06d38..46a6846 100644
--- a/crates/core/src/hfile/reader.rs
+++ b/crates/core/src/hfile/reader.rs
@@ -30,6 +30,7 @@ use crate::hfile::key::{compare_keys, Key, KeyValue, Utf8Key};
 use crate::hfile::proto::InfoProto;
 use crate::hfile::record::HFileRecord;
 use crate::hfile::trailer::HFileTrailer;
+use crate::storage::Storage;
 use apache_avro::Schema as AvroSchema;
 use prost::Message;
 use std::cell::OnceCell;
@@ -132,6 +133,29 @@ impl HFileReader {
         Ok(reader)
     }
 
+    /// Open an HFile from storage.
+    ///
+    /// This is an async factory method that reads the file from storage
+    /// and creates an HFileReader.
+    ///
+    /// # Arguments
+    /// * `storage` - The storage to read from
+    /// * `relative_path` - The relative path to the HFile
+    ///
+    /// # Example
+    /// ```ignore
+    /// let reader = HFileReader::open(&storage, "files/data.hfile").await?;
+    /// for record in reader.iter()? {
+    ///     println!("{:?}", record?);
+    /// }
+    /// ```
+    pub async fn open(storage: &Storage, relative_path: &str) -> Result<Self> {
+        let bytes = storage.get_file_data(relative_path).await.map_err(|e| {
+            HFileError::InvalidFormat(format!("Failed to read HFile {}: {:?}", 
relative_path, e))
+        })?;
+        Self::new(bytes.to_vec())
+    }
+
     /// Initialize metadata by reading index blocks and file info.
     fn initialize_metadata(&mut self) -> Result<()> {
         // Read the "load-on-open" section starting from 
load_on_open_data_offset
@@ -2269,4 +2293,245 @@ mod tests {
         );
         assert!(total_size > 0, "Total size across partitions should be > 0");
     }
+
+    // ================== File Info and Meta Block Tests ==================
+
+    #[test]
+    fn test_get_file_info_last_key() {
+        let bytes = 
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // LASTKEY should be present in file info
+        let last_key = reader.get_file_info("hfile.LASTKEY");
+        assert!(last_key.is_some(), "LASTKEY should be present");
+
+        // Parse the last key - it's the structured key bytes
+        let last_key_bytes = last_key.unwrap();
+        assert!(!last_key_bytes.is_empty(), "LASTKEY should not be empty");
+    }
+
+    #[test]
+    fn test_get_file_info_not_found() {
+        let bytes = 
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Non-existent key should return None
+        let result = reader.get_file_info("nonexistent.key");
+        assert!(result.is_none());
+    }
+
+    #[test]
+    fn test_get_avro_schema_from_metadata_hfile() {
+        let bytes = read_metadata_table_hfile();
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Metadata table HFiles should have embedded Avro schema
+        let schema = reader.get_avro_schema().expect("Failed to get schema");
+        assert!(schema.is_some(), "Metadata HFile should have Avro schema");
+
+        let avro_schema = schema.unwrap();
+        // Schema should be a record type for HoodieMetadataRecord
+        assert!(
+            matches!(avro_schema, AvroSchema::Record(_)),
+            "Schema should be a record type"
+        );
+    }
+
+    #[test]
+    fn test_get_avro_schema_regular_hfile() {
+        // Regular test HFiles don't have Avro schema in file info
+        let bytes = 
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        let schema = reader.get_avro_schema().expect("Failed to get schema");
+        // Regular HFiles typically don't have embedded Avro schema
+        assert!(
+            schema.is_none(),
+            "Regular HFile should not have Avro schema"
+        );
+    }
+
+    #[test]
+    fn test_read_min_max_record_keys_from_metadata_hfile() {
+        let bytes = read_metadata_table_hfile();
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Metadata table HFiles should have min/max record keys
+        let result = reader.read_min_max_record_keys();
+
+        // The metadata HFile may or may not have these keys depending on how 
it was created
+        // If present, verify the structure
+        if let Some((min_key, max_key)) = result {
+            assert!(!min_key.is_empty(), "Min key should not be empty");
+            assert!(!max_key.is_empty(), "Max key should not be empty");
+            // Min should be <= Max lexicographically
+            assert!(
+                min_key <= max_key,
+                "Min key should be <= Max key: {} vs {}",
+                min_key,
+                max_key
+            );
+        }
+    }
+
+    #[test]
+    fn test_read_min_max_record_keys_regular_hfile() {
+        let bytes = 
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+        // Regular test HFiles typically don't have min/max record keys
+        let result = reader.read_min_max_record_keys();
+        assert!(
+            result.is_none(),
+            "Regular HFile should not have min/max record keys"
+        );
+    }
+
+    // ================== Error Handling Tests ==================
+
+    #[test]
+    fn test_invalid_hfile_too_small() {
+        // File too small to contain a valid trailer
+        let bytes = vec![0u8; 10];
+        let result = HFileReader::new(bytes);
+        assert!(result.is_err(), "Should fail for file too small");
+    }
+
+    #[test]
+    fn test_invalid_hfile_bad_magic() {
+        // Create a file with wrong magic bytes at the end
+        let mut bytes = vec![0u8; 100];
+        // HFile trailer magic is at the end - put garbage there
+        bytes[96..100].copy_from_slice(b"BAAD");
+        let result = HFileReader::new(bytes);
+        assert!(result.is_err(), "Should fail for invalid magic");
+    }
+
+    // ================== Multi-Block Iteration Tests ==================
+
+    #[test]
+    fn test_iterate_across_multiple_blocks() {
+        // Use GZIP file with 20000 entries - spans multiple blocks
+        let bytes = 
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create 
reader");
+
+        let mut count = 0;
+        let mut prev_key: Option<String> = None;
+
+        for result in reader.iter().expect("Failed to create iterator") {
+            let kv = result.expect("Failed to read kv");
+            let key = kv.key().content_as_str().unwrap().to_string();
+
+            // Verify keys are in ascending order
+            if let Some(ref prev) = prev_key {
+                assert!(
+                    key > *prev,
+                    "Keys should be in ascending order: {} > {}",
+                    key,
+                    prev
+                );
+            }
+            prev_key = Some(key);
+            count += 1;
+        }
+
+        assert_eq!(count, 20000, "Should iterate all 20000 entries");
+    }
+
+    #[test]
+    fn test_seek_across_block_boundaries() {
+        // Use 512KB blocks with 20000 entries
+        let bytes = 
read_test_hfile("hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create 
reader");
+
+        reader.seek_to_first().expect("Failed to seek");
+
+        // Seek to various keys that likely span different blocks
+        let test_keys = [
+            "hudi-key-000000000", // First key
+            "hudi-key-000005000", // Middle
+            "hudi-key-000010000", // Another block
+            "hudi-key-000015000", // Another block
+            "hudi-key-000019999", // Last key
+        ];
+
+        for expected_key in test_keys {
+            let lookup = Utf8Key::new(expected_key);
+            let result = reader.seek_to(&lookup).expect("Failed to seek");
+            assert_eq!(
+                result,
+                SeekResult::Found,
+                "Should find key: {}",
+                expected_key
+            );
+
+            let kv = reader.get_key_value().expect("Failed to get 
kv").unwrap();
+            assert_eq!(
+                kv.key().content_as_str().unwrap(),
+                expected_key,
+                "Key mismatch"
+            );
+        }
+    }
+
+    #[test]
+    fn test_next_at_eof() {
+        let bytes = 
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create 
reader");
+
+        // Seek to last key
+        reader.seek_to_first().expect("Failed to seek");
+        let lookup = Utf8Key::new("hudi-key-000004999");
+        let result = reader.seek_to(&lookup).expect("Failed to seek");
+        assert_eq!(result, SeekResult::Found);
+
+        // next() should return false at EOF
+        assert!(!reader.next().expect("Failed to next"));
+
+        // get_key_value should return None after EOF
+        assert!(reader.get_key_value().expect("Failed to get kv").is_none());
+    }
+
+    #[test]
+    fn test_collect_records_gzip() {
+        let bytes = 
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create 
reader");
+
+        let records = reader.collect_records().expect("Failed to collect 
records");
+        assert_eq!(records.len(), 20000);
+
+        // Verify first and last records
+        assert_eq!(records[0].key_as_str(), Some("hudi-key-000000000"));
+        assert_eq!(records[19999].key_as_str(), Some("hudi-key-000019999"));
+    }
+
+    #[test]
+    fn test_lookup_records_across_blocks() {
+        let bytes = 
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile");
+        let mut reader = HFileReader::new(bytes).expect("Failed to create 
reader");
+
+        // Look up keys that span different blocks
+        let keys = vec![
+            "hudi-key-000000000",
+            "hudi-key-000005000",
+            "hudi-key-000010000",
+            "hudi-key-000015000",
+            "hudi-key-000019999",
+            "hudi-key-nonexistent",
+        ];
+
+        let results = reader.lookup_records(&keys).expect("Failed to lookup");
+        assert_eq!(results.len(), 6);
+
+        // First 5 should be found
+        for (key, value) in results.iter().take(5) {
+            assert!(value.is_some(), "Key {} should be found", key);
+        }
+
+        // Last one should not be found
+        assert!(
+            results[5].1.is_none(),
+            "Nonexistent key should not be found"
+        );
+    }
 }
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs
index e1c326b..6fe247c 100644
--- a/crates/core/src/metadata/mod.rs
+++ b/crates/core/src/metadata/mod.rs
@@ -31,3 +31,6 @@ pub const LAKE_FORMAT_METADATA_DIRS: &[&str; 3] = &[
     DELTALAKE_METADATA_DIR,
     ICEBERG_METADATA_DIR,
 ];
+
+/// The virtual partition field name used in metadata tables.
+pub const MDT_PARTITION_FIELD: &str = "partition";
diff --git a/crates/core/src/metadata/table_record.rs 
b/crates/core/src/metadata/table_record.rs
index cabb393..400116e 100644
--- a/crates/core/src/metadata/table_record.rs
+++ b/crates/core/src/metadata/table_record.rs
@@ -121,6 +121,9 @@ pub struct FilesPartitionRecord {
 }
 
 impl FilesPartitionRecord {
+    /// The partition name in the metadata table that stores file listings.
+    pub const PARTITION_NAME: &'static str = "files";
+
     /// Check if this is an ALL_PARTITIONS record.
     pub fn is_all_partitions(&self) -> bool {
         self.record_type == MetadataRecordType::AllPartitions
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index deb5963..d1f4d35 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -106,6 +106,7 @@ impl Storage {
     }
 
     #[cfg(test)]
+    /// Get basic file metadata (name, size) without loading the file content.
     async fn get_file_metadata_not_populated(&self, relative_path: &str) -> 
Result<FileMetadata> {
         let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
         let obj_path = ObjPath::from_url_path(obj_url.path())?;
@@ -116,6 +117,7 @@ impl Storage {
         Ok(FileMetadata::new(name.to_string(), meta.size))
     }
 
+    /// Get full file metadata for a Parquet file, including record counts 
from Parquet metadata.
     pub async fn get_file_metadata(&self, relative_path: &str) -> 
Result<FileMetadata> {
         let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
         let obj_path = ObjPath::from_url_path(obj_url.path())?;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 7081a64..2c72a82 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -94,13 +94,17 @@ pub mod partition;
 mod validation;
 
 use crate::config::read::HudiReadConfig;
-use crate::config::table::HudiTableConfig::PartitionFields;
+use crate::config::table::HudiTableConfig::{MetadataTablePartitions, 
PartitionFields};
 use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::HudiConfigs;
+use crate::error::CoreError;
 use crate::expr::filter::{from_str_tuples, Filter};
 use crate::file_group::file_slice::FileSlice;
 use crate::file_group::reader::FileGroupReader;
+use crate::metadata::table_record::FilesPartitionRecord;
+use crate::metadata::MDT_PARTITION_FIELD;
 use crate::schema::resolver::{resolve_avro_schema, resolve_schema};
+use crate::storage::util::join_url_segments;
 use crate::table::builder::TableBuilder;
 use crate::table::fs_view::FileSystemView;
 use crate::table::partition::PartitionPruner;
@@ -213,6 +217,64 @@ impl Table {
         self.table_type() == TableTypeValue::MergeOnRead.as_ref()
     }
 
+    /// Check if this table is a metadata table.
+    ///
+    /// Detection is based on the base path ending with `.hoodie/metadata`.
+    pub fn is_metadata_table(&self) -> bool {
+        let base_path: String = self
+            .hudi_configs
+            .get_or_default(HudiTableConfig::BasePath)
+            .into();
+        crate::util::path::is_metadata_table_path(&base_path)
+    }
+
+    /// Get the list of available metadata table partitions for this table.
+    ///
+    /// Returns the partitions configured in 
`hoodie.table.metadata.partitions`.
+    pub fn get_metadata_table_partitions(&self) -> Vec<String> {
+        self.hudi_configs
+            .get_or_default(MetadataTablePartitions)
+            .into()
+    }
+
+    /// Create a metadata table instance for this data table.
+    ///
+    /// Uses all partitions from `hoodie.table.metadata.partitions` 
configuration.
+    ///
+    /// # Errors
+    ///
+    /// Returns an error if the metadata table cannot be created or if there 
are
+    /// no metadata table partitions configured.
+    pub async fn new_metadata_table(&self) -> Result<Table> {
+        if self.is_metadata_table() {
+            return Err(CoreError::MetadataTable(
+                "Cannot create metadata table from another metadata 
table".to_string(),
+            ));
+        }
+
+        let mdt_partitions = self.get_metadata_table_partitions();
+        if mdt_partitions.is_empty() {
+            return Err(CoreError::MetadataTable(
+                "No metadata table partitions configured".to_string(),
+            ));
+        }
+
+        let mdt_url = join_url_segments(&self.base_url(), &[".hoodie", 
"metadata"])?;
+        Table::new_with_options(
+            mdt_url.as_str(),
+            [(PartitionFields.as_ref(), MDT_PARTITION_FIELD)],
+        )
+        .await
+    }
+
+    /// Same as [Table::new_metadata_table], but blocking.
+    pub fn new_metadata_table_blocking(&self) -> Result<Table> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { self.new_metadata_table().await })
+    }
+
     pub fn timezone(&self) -> String {
         self.hudi_configs
             .get_or_default(HudiTableConfig::TimelineTimezone)
@@ -260,7 +322,22 @@ impl Table {
     }
 
     /// Get the latest partition [arrow_schema::Schema] of the table.
+    ///
+    /// For metadata tables, returns a schema with a single `partition` field
+    /// typed as [arrow_schema::DataType::Utf8], since MDT uses a single 
partition
+    /// column to identify partitions like "files", "column_stats", etc.
+    ///
+    /// For regular tables, returns the partition fields with their actual 
data types
+    /// derived from the table schema.
     pub async fn get_partition_schema(&self) -> Result<Schema> {
+        if self.is_metadata_table() {
+            return Ok(Schema::new(vec![Field::new(
+                MDT_PARTITION_FIELD,
+                arrow_schema::DataType::Utf8,
+                false,
+            )]));
+        }
+
         let partition_fields: HashSet<String> = {
             let fields: Vec<String> = 
self.hudi_configs.get_or_default(PartitionFields).into();
             fields.into_iter().collect()
@@ -609,6 +686,56 @@ impl Table {
         )
     }
 
+    /// Read records from the "files" partition of the metadata table.
+    ///
+    /// This method can only be called on metadata tables. It reads all records
+    /// from the "files" partition and returns merged `FilesPartitionRecord`s.
+    ///
+    /// # Returns
+    /// A HashMap mapping record keys to their `FilesPartitionRecord`s.
+    pub async fn read_metadata_files(&self) -> Result<HashMap<String, 
FilesPartitionRecord>> {
+        if !self.is_metadata_table() {
+            return Err(CoreError::MetadataTable(
+                "read_metadata_files can only be called on metadata 
tables".to_string(),
+            ));
+        }
+
+        let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() else {
+            return Ok(HashMap::new());
+        };
+
+        let filters = from_str_tuples([(
+            MDT_PARTITION_FIELD,
+            "=",
+            FilesPartitionRecord::PARTITION_NAME,
+        )])?;
+        let file_slices = self.get_file_slices_internal(timestamp, 
&filters).await?;
+
+        if file_slices.len() != 1 {
+            return Err(CoreError::MetadataTable(format!(
+                "Expected 1 file slice for {} partition, got {}",
+                FilesPartitionRecord::PARTITION_NAME,
+                file_slices.len()
+            )));
+        }
+        let file_slice = &file_slices[0];
+
+        let fg_reader = self.create_file_group_reader_with_options([(
+            HudiReadConfig::FileGroupEndTimestamp,
+            timestamp,
+        )])?;
+
+        fg_reader.read_file_slice_from_mdt(file_slice).await
+    }
+
+    /// Same as [Table::read_metadata_files], but blocking.
+    pub fn read_metadata_files_blocking(&self) -> Result<HashMap<String, 
FilesPartitionRecord>> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(self.read_metadata_files())
+    }
+
     /// Get all the latest records in the table.
     ///
     /// # Arguments
@@ -770,6 +897,7 @@ mod tests {
     use crate::config::HUDI_CONF_DIR;
     use crate::error::CoreError;
     use crate::metadata::meta_field::MetaField;
+    use crate::metadata::table_record::MetadataRecordType;
     use crate::storage::util::join_url_segments;
     use crate::storage::Storage;
     use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq, 
SampleTable};
@@ -1017,8 +1145,10 @@ mod tests {
         assert_eq!(actual, "default");
         let actual: bool = configs.get_or_default(DropsPartitionFields).into();
         assert!(!actual);
-        assert!(panic::catch_unwind(|| 
configs.get_or_default(IsHiveStylePartitioning)).is_err());
-        assert!(panic::catch_unwind(|| 
configs.get_or_default(IsPartitionPathUrlencoded)).is_err());
+        let actual: bool = 
configs.get_or_default(IsHiveStylePartitioning).into();
+        assert!(!actual);
+        let actual: bool = 
configs.get_or_default(IsPartitionPathUrlencoded).into();
+        assert!(!actual);
         assert!(panic::catch_unwind(|| 
configs.get_or_default(KeyGeneratorClass)).is_err());
         let actual: Vec<String> = 
configs.get_or_default(PartitionFields).into();
         assert!(actual.is_empty());
@@ -1440,4 +1570,87 @@ mod tests {
         let expected = HashSet::new();
         assert_eq!(actual, expected);
     }
+
+    // 
=========================================================================
+    // Metadata Table Tests
+    // 
=========================================================================
+
+    fn get_data_table() -> Table {
+        use hudi_test::QuickstartTripsTable;
+        let table_path = 
QuickstartTripsTable::V8Trips8I3U1D.path_to_mor_avro();
+        Table::new_blocking(&table_path).unwrap()
+    }
+
+    #[test]
+    fn hudi_table_read_metadata_files() {
+        let data_table = get_data_table();
+        let metadata_table = data_table.new_metadata_table_blocking().unwrap();
+
+        assert!(metadata_table.is_metadata_table());
+
+        let records = metadata_table.read_metadata_files_blocking().unwrap();
+
+        // Should have 4 records: __all_partitions__ + 3 city partitions
+        assert_eq!(records.len(), 4);
+
+        // Validate __all_partitions__ record
+        let all_partitions = records.get("__all_partitions__").unwrap();
+        assert_eq!(
+            all_partitions.record_type,
+            MetadataRecordType::AllPartitions
+        );
+        let partition_names: HashSet<_> = 
all_partitions.partition_names().into_iter().collect();
+        assert_eq!(
+            partition_names,
+            HashSet::from(["city=chennai", "city=san_francisco", 
"city=sao_paulo"])
+        );
+
+        // Validate city=chennai record with actual file names
+        let chennai = records.get("city=chennai").unwrap();
+        assert_eq!(chennai.record_type, MetadataRecordType::Files);
+        let chennai_files: HashSet<_> = 
chennai.active_file_names().into_iter().collect();
+        assert_eq!(
+            chennai_files,
+            HashSet::from([
+                
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_2-986-2794_20251220210108078.parquet",
+                
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_0-1112-3190_20251220210129235.parquet",
+                
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210127080.log.1_0-1072-3078",
+                
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210128625.log.1_0-1097-3150",
+            ])
+        );
+        assert!(chennai.total_size() > 0);
+    }
+
+    #[test]
+    fn hudi_table_get_metadata_table_partitions() {
+        let data_table = get_data_table();
+
+        // Verify we can get the MDT partitions from the data table
+        let partitions = data_table.get_metadata_table_partitions();
+
+        // The test table has 5 MDT partitions configured
+        assert_eq!(
+            partitions.len(),
+            5,
+            "Should have 5 MDT partitions, got: {:?}",
+            partitions
+        );
+
+        // Verify all expected partitions are present
+        let expected = [
+            "column_stats",
+            "files",
+            "partition_stats",
+            "record_index",
+            "secondary_index_rider_idx",
+        ];
+        for partition in &expected {
+            assert!(
+                partitions.contains(&partition.to_string()),
+                "Should contain '{}' partition, got: {:?}",
+                partition,
+                partitions
+            );
+        }
+    }
 }
diff --git a/crates/core/src/table/validation.rs 
b/crates/core/src/table/validation.rs
index 3a9d161..381b7ea 100644
--- a/crates/core/src/table/validation.rs
+++ b/crates/core/src/table/validation.rs
@@ -18,13 +18,16 @@
  */
 use crate::config::internal::HudiInternalConfig::SkipConfigValidation;
 use crate::config::read::HudiReadConfig;
+use crate::config::table::BaseFileFormatValue;
 use crate::config::table::HudiTableConfig;
 use crate::config::table::HudiTableConfig::{
-    DropsPartitionFields, TableVersion, TimelineLayoutVersion,
+    BaseFileFormat, BasePath, DropsPartitionFields, TableVersion, 
TimelineLayoutVersion,
 };
 use crate::config::HudiConfigs;
 use crate::error::CoreError;
 use crate::merge::record_merger::RecordMerger;
+use crate::util::path::is_metadata_table_path;
+use std::str::FromStr;
 use strum::IntoEnumIterator;
 
 pub fn validate_configs(hudi_configs: &HudiConfigs) -> 
crate::error::Result<()> {
@@ -68,6 +71,20 @@ pub fn validate_configs(hudi_configs: &HudiConfigs) -> 
crate::error::Result<()>
         )));
     }
 
+    // Validate HFile format is only used for metadata tables
+    if let Ok(base_file_format_str) = hudi_configs.get(BaseFileFormat) {
+        let format_str: String = base_file_format_str.into();
+        if let Ok(BaseFileFormatValue::HFile) = 
BaseFileFormatValue::from_str(&format_str) {
+            let base_path: String = 
hudi_configs.get_or_default(BasePath).into();
+            if !is_metadata_table_path(&base_path) {
+                return Err(CoreError::Unsupported(format!(
+                    "Base file format '{}' is only valid for metadata tables",
+                    format_str
+                )));
+            }
+        }
+    }
+
     RecordMerger::validate_configs(hudi_configs)?;
 
     Ok(())
@@ -182,4 +199,56 @@ mod tests {
             panic!("Expected CoreError::Unsupported for v6 with layout 2");
         }
     }
+
+    #[test]
+    fn test_hfile_format_rejected_for_regular_table() {
+        use crate::config::table::HudiTableConfig::BaseFileFormat;
+
+        let mut options = HashMap::new();
+        options.insert(TableName.as_ref().to_string(), 
"test_table".to_string());
+        options.insert(TableType.as_ref().to_string(), 
"MERGE_ON_READ".to_string());
+        options.insert(TableVersion.as_ref().to_string(), "8".to_string());
+        options.insert(TimelineLayoutVersion.as_ref().to_string(), 
"2".to_string());
+        options.insert(BaseFileFormat.as_ref().to_string(), 
"hfile".to_string());
+        options.insert("hoodie.base.path".to_string(), 
"/data/my_table".to_string());
+
+        let configs = HudiConfigs::new(options);
+        let result = validate_configs(&configs);
+
+        assert!(result.is_err());
+        if let Err(CoreError::Unsupported(msg)) = result {
+            assert!(
+                msg.contains("only valid for metadata tables"),
+                "Unexpected message: {}",
+                msg
+            );
+        } else {
+            panic!("Expected CoreError::Unsupported for HFile on regular 
table");
+        }
+    }
+
+    #[test]
+    fn test_hfile_format_allowed_for_metadata_table() {
+        use crate::config::table::HudiTableConfig::BaseFileFormat;
+
+        let mut options = HashMap::new();
+        options.insert(TableName.as_ref().to_string(), "metadata".to_string());
+        options.insert(TableType.as_ref().to_string(), 
"MERGE_ON_READ".to_string());
+        options.insert(TableVersion.as_ref().to_string(), "8".to_string());
+        options.insert(TimelineLayoutVersion.as_ref().to_string(), 
"2".to_string());
+        options.insert(BaseFileFormat.as_ref().to_string(), 
"hfile".to_string());
+        options.insert(
+            "hoodie.base.path".to_string(),
+            "/data/my_table/.hoodie/metadata".to_string(),
+        );
+
+        let configs = HudiConfigs::new(options);
+        let result = validate_configs(&configs);
+
+        assert!(
+            result.is_ok(),
+            "HFile format should be allowed for metadata table: {:?}",
+            result
+        );
+    }
 }
diff --git a/crates/core/src/timeline/instant.rs 
b/crates/core/src/timeline/instant.rs
index 1cd29af..4138fbe 100644
--- a/crates/core/src/timeline/instant.rs
+++ b/crates/core/src/timeline/instant.rs
@@ -194,9 +194,35 @@ impl Instant {
         Ok(())
     }
 
+    /// Parse a timestamp string into a UTC DateTime.
+    ///
+    /// Supports two formats:
+    /// 1. Date format: `yyyyMMddHHmmss` (14 chars) or `yyyyMMddHHmmssSSS` (17 
chars)
+    /// 2. Epoch milliseconds: 17-digit number representing milliseconds since 
Unix epoch
+    ///    (used by metadata table for timestamps like `00000000000000000`)
+    ///
+    /// The function tries date format first, then falls back to epoch 
milliseconds
+    /// if the date parsing fails (e.g., invalid month/day like `00`).
     pub fn parse_datetime(timestamp: &str, timezone: &str) -> 
Result<DateTime<Utc>> {
-        let naive_dt = Self::parse_naive_datetime(timestamp)?;
-        Self::convert_to_timezone(naive_dt, timezone)
+        // First try parsing as yyyyMMddHHmmssSSS date format
+        if let Ok(naive_dt) = Self::parse_naive_datetime(timestamp) {
+            return Self::convert_to_timezone(naive_dt, timezone);
+        }
+
+        // Fallback: treat as epoch milliseconds (zero-padded 17-digit number)
+        // This handles metadata table timestamps like 00000000000000000, 
00000000000000001, etc.
+        if timestamp.len() == 17 && timestamp.chars().all(|c| 
c.is_ascii_digit()) {
+            let epoch_ms: i64 = timestamp
+                .parse()
+                .map_err(|e| CoreError::Timeline(format!("Invalid epoch 
timestamp: {}", e)))?;
+            return DateTime::from_timestamp_millis(epoch_ms)
+                .ok_or_else(|| CoreError::Timeline("Invalid epoch 
millis".to_string()));
+        }
+
+        Err(CoreError::Timeline(format!(
+            "Cannot parse timestamp '{}': not a valid date format or epoch 
millis",
+            timestamp
+        )))
     }
 
     fn parse_naive_datetime(timestamp: &str) -> Result<NaiveDateTime> {
@@ -538,4 +564,53 @@ mod tests {
         let result = Instant::from_str("202403151425301.commit");
         assert!(result.is_err());
     }
+
+    #[test]
+    fn test_parse_datetime_epoch_millis() -> Result<()> {
+        // Epoch 0 (1970-01-01 00:00:00.000 UTC)
+        let dt = Instant::parse_datetime("00000000000000000", "UTC")?;
+        assert_eq!(dt.timestamp_millis(), 0);
+
+        // Epoch 1ms
+        let dt = Instant::parse_datetime("00000000000000001", "UTC")?;
+        assert_eq!(dt.timestamp_millis(), 1);
+
+        // Epoch 1000ms (1 second)
+        let dt = Instant::parse_datetime("00000000000001000", "UTC")?;
+        assert_eq!(dt.timestamp_millis(), 1000);
+
+        // A larger epoch value
+        let dt = Instant::parse_datetime("00001734567890123", "UTC")?;
+        assert_eq!(dt.timestamp_millis(), 1734567890123);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_parse_datetime_epoch_ordering() -> Result<()> {
+        // Verify ordering: epoch 0 < epoch 1 < epoch 2 < ... < real timestamp
+        let epoch_0 = Instant::parse_datetime("00000000000000000", "UTC")?;
+        let epoch_1 = Instant::parse_datetime("00000000000000001", "UTC")?;
+        let epoch_2 = Instant::parse_datetime("00000000000000002", "UTC")?;
+        let real_ts = Instant::parse_datetime("20240315142530500", "UTC")?;
+
+        assert!(epoch_0 < epoch_1);
+        assert!(epoch_1 < epoch_2);
+        assert!(epoch_2 < real_ts);
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_parse_datetime_date_format_still_works() -> Result<()> {
+        // Valid date format should still parse correctly (not treated as 
epoch)
+        let dt = Instant::parse_datetime("20240101120000000", "UTC")?;
+        // This is Jan 1, 2024, 12:00:00.000 UTC - NOT epoch 20240101120000000
+        assert_eq!(
+            dt.format("%Y-%m-%d %H:%M:%S").to_string(),
+            "2024-01-01 12:00:00"
+        );
+
+        Ok(())
+    }
 }
diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs
index c9aa85b..744d992 100644
--- a/crates/core/src/util/mod.rs
+++ b/crates/core/src/util/mod.rs
@@ -18,3 +18,4 @@
  */
 pub mod arrow;
 pub mod collection;
+pub mod path;
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/util/path.rs
similarity index 51%
copy from crates/core/src/metadata/mod.rs
copy to crates/core/src/util/path.rs
index e1c326b..77a167b 100644
--- a/crates/core/src/metadata/mod.rs
+++ b/crates/core/src/util/path.rs
@@ -16,18 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-pub mod commit;
-pub mod merger;
-pub mod meta_field;
-pub mod replace_commit;
-pub mod table_record;
 
-pub const HUDI_METADATA_DIR: &str = ".hoodie";
-pub const DELTALAKE_METADATA_DIR: &str = "_delta_log";
-pub const ICEBERG_METADATA_DIR: &str = "metadata";
+/// Check if the given path is a metadata table path.
+///
+/// Detection is based on the path ending with `.hoodie/metadata`.
+pub fn is_metadata_table_path(path: &str) -> bool {
+    path.trim_end_matches('/').ends_with(".hoodie/metadata")
+}
 
-pub const LAKE_FORMAT_METADATA_DIRS: &[&str; 3] = &[
-    HUDI_METADATA_DIR,
-    DELTALAKE_METADATA_DIR,
-    ICEBERG_METADATA_DIR,
-];
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_is_metadata_table_path() {
+        assert!(is_metadata_table_path("/data/my_table/.hoodie/metadata"));
+        assert!(is_metadata_table_path("/data/my_table/.hoodie/metadata/"));
+        assert!(is_metadata_table_path("s3://bucket/table/.hoodie/metadata"));
+        assert!(!is_metadata_table_path("/data/my_table"));
+        assert!(!is_metadata_table_path("/data/my_table/.hoodie"));
+        assert!(!is_metadata_table_path("/data/.hoodie/metadata/files"));
+    }
+}

Reply via email to