This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 23a3b4a feat: support reading metadata table (files) (#499)
23a3b4a is described below
commit 23a3b4a8f092439b7d7b5b61de8f7980c18a3e66
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Dec 22 10:38:32 2025 -0600
feat: support reading metadata table (files) (#499)
---
crates/core/src/config/table.rs | 39 +++
crates/core/src/file_group/file_slice.rs | 11 +
crates/core/src/file_group/reader.rs | 349 ++++++++++++++++++++++
crates/core/src/hfile/reader.rs | 265 ++++++++++++++++
crates/core/src/metadata/mod.rs | 3 +
crates/core/src/metadata/table_record.rs | 3 +
crates/core/src/storage/mod.rs | 2 +
crates/core/src/table/mod.rs | 219 +++++++++++++-
crates/core/src/table/validation.rs | 71 ++++-
crates/core/src/timeline/instant.rs | 79 ++++-
crates/core/src/util/mod.rs | 1 +
crates/core/src/{metadata/mod.rs => util/path.rs} | 33 +-
12 files changed, 1056 insertions(+), 19 deletions(-)
diff --git a/crates/core/src/config/table.rs b/crates/core/src/config/table.rs
index e3fd273..bcaf3b9 100644
--- a/crates/core/src/config/table.rs
+++ b/crates/core/src/config/table.rs
@@ -128,6 +128,20 @@ pub enum HudiTableConfig {
/// Path for LSM timeline history for layout v2, relative to timeline path
(default: history)
/// The full path will be `.hoodie/{TimelinePath}/{TimelineHistoryPath}`
TimelineHistoryPath,
+
+ /// Enable the internal metadata table which serves table metadata like
file listings.
+ ///
+ /// When enabled, file listings are read from the metadata table instead
of storage,
+ /// which can significantly improve performance for tables with many
partitions.
+ MetadataTableEnabled,
+
+ /// List of metadata table partitions enabled for this table.
+ ///
+ /// This config is read from the data table's hoodie.properties and
specifies which
+ /// partitions are available in the metadata table (e.g., "files",
"column_stats").
+ /// When creating a metadata table instance, this value should be passed
as the
+ /// PartitionFields option.
+ MetadataTablePartitions,
}
impl AsRef<str> for HudiTableConfig {
@@ -155,6 +169,8 @@ impl AsRef<str> for HudiTableConfig {
Self::ArchiveLogFolder => "hoodie.archivelog.folder",
Self::TimelinePath => "hoodie.timeline.path",
Self::TimelineHistoryPath => "hoodie.timeline.history.path",
+ Self::MetadataTableEnabled => "hoodie.metadata.enable",
+ Self::MetadataTablePartitions =>
"hoodie.table.metadata.partitions",
}
}
}
@@ -175,6 +191,8 @@ impl ConfigParser for HudiTableConfig {
)),
Self::DatabaseName =>
Some(HudiConfigValue::String("default".to_string())),
Self::DropsPartitionFields =>
Some(HudiConfigValue::Boolean(false)),
+ Self::IsHiveStylePartitioning =>
Some(HudiConfigValue::Boolean(false)),
+ Self::IsPartitionPathUrlencoded =>
Some(HudiConfigValue::Boolean(false)),
Self::PartitionFields => Some(HudiConfigValue::List(vec![])),
Self::PopulatesMetaFields => Some(HudiConfigValue::Boolean(true)),
Self::TimelineTimezone => Some(HudiConfigValue::String(
@@ -183,6 +201,8 @@ impl ConfigParser for HudiTableConfig {
Self::ArchiveLogFolder =>
Some(HudiConfigValue::String(".hoodie/archived".to_string())),
Self::TimelinePath =>
Some(HudiConfigValue::String("timeline".to_string())),
Self::TimelineHistoryPath =>
Some(HudiConfigValue::String("history".to_string())),
+ Self::MetadataTableEnabled =>
Some(HudiConfigValue::Boolean(false)),
+ Self::MetadataTablePartitions =>
Some(HudiConfigValue::List(vec![])),
_ => None,
}
}
@@ -258,6 +278,13 @@ impl ConfigParser for HudiTableConfig {
Self::ArchiveLogFolder => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::TimelinePath => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::TimelineHistoryPath => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
+ Self::MetadataTableEnabled => get_result
+ .and_then(|v| {
+ bool::from_str(v).map_err(|e| ParseBool(self.key(),
v.to_string(), e))
+ })
+ .map(HudiConfigValue::Boolean),
+ Self::MetadataTablePartitions => get_result
+ .map(|v|
HudiConfigValue::List(v.split(',').map(str::to_string).collect())),
}
}
@@ -323,6 +350,9 @@ impl FromStr for TableTypeValue {
pub enum BaseFileFormatValue {
#[strum(serialize = "parquet")]
Parquet,
+ /// HFile format - only valid for metadata tables.
+ #[strum(serialize = "hfile")]
+ HFile,
}
impl FromStr for BaseFileFormatValue {
@@ -331,6 +361,7 @@ impl FromStr for BaseFileFormatValue {
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"parquet" => Ok(Self::Parquet),
+ "hfile" => Ok(Self::HFile),
"orc" => Err(UnsupportedValue(s.to_string())),
v => Err(InvalidValue(v.to_string())),
}
@@ -423,6 +454,14 @@ mod tests {
BaseFileFormatValue::from_str("PArquet").unwrap(),
BaseFileFormatValue::Parquet
);
+ assert_eq!(
+ BaseFileFormatValue::from_str("hfile").unwrap(),
+ BaseFileFormatValue::HFile
+ );
+ assert_eq!(
+ BaseFileFormatValue::from_str("HFILE").unwrap(),
+ BaseFileFormatValue::HFile
+ );
assert!(matches!(
BaseFileFormatValue::from_str("").unwrap_err(),
InvalidValue(_)
diff --git a/crates/core/src/file_group/file_slice.rs
b/crates/core/src/file_group/file_slice.rs
index 7218188..c81b62e 100644
--- a/crates/core/src/file_group/file_slice.rs
+++ b/crates/core/src/file_group/file_slice.rs
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+use crate::config::table::BaseFileFormatValue;
use crate::error::CoreError;
use crate::file_group::base_file::BaseFile;
use crate::file_group::log_file::LogFile;
@@ -112,12 +113,22 @@ impl FileSlice {
/// Load [FileMetadata] from storage layer for the [BaseFile] if
`file_metadata` is [None]
/// or if `file_metadata` is not fully populated.
+ ///
+ /// This only loads metadata for Parquet files. For non-Parquet files
(e.g., HFile),
+ /// this is a no-op since Parquet-specific metadata reading would fail.
+ /// TODO: see if mdt read would benefit from loading hfile metadata as
well.
pub async fn load_metadata_if_needed(&mut self, storage: &Storage) ->
Result<()> {
+ // Skip non-Parquet files - metadata loading uses Parquet-specific APIs
+ if self.base_file.extension != BaseFileFormatValue::Parquet.as_ref() {
+ return Ok(());
+ }
+
if let Some(metadata) = &self.base_file.file_metadata {
if metadata.fully_populated {
return Ok(());
}
}
+
let relative_path = self.base_file_relative_path()?;
let fetched_metadata =
storage.get_file_metadata(&relative_path).await?;
self.base_file.file_metadata = Some(fetched_metadata);
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 7f41c0b..1048662 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -26,8 +26,11 @@ use crate::expr::filter::{Filter, SchemableFilter};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::log_file::scanner::{LogFileScanner, ScanResult};
use crate::file_group::record_batches::RecordBatches;
+use crate::hfile::HFileReader;
use crate::merge::record_merger::RecordMerger;
+use crate::metadata::merger::FilesPartitionMerger;
use crate::metadata::meta_field::MetaField;
+use crate::metadata::table_record::FilesPartitionRecord;
use crate::storage::Storage;
use crate::table::builder::OptionResolver;
use crate::timeline::selector::InstantRange;
@@ -36,6 +39,7 @@ use arrow::compute::and;
use arrow::compute::filter_record_batch;
use arrow_array::{BooleanArray, RecordBatch};
use futures::TryFutureExt;
+use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;
@@ -320,6 +324,158 @@ impl FileGroupReader {
.build()?
.block_on(self.read_file_slice_from_paths(base_file_path,
log_file_paths))
}
+
+ //
=========================================================================
+ // Metadata Table (MDT) File Slice Reading
+ //
=========================================================================
+
+ /// Check if this reader is configured for a metadata table.
+ ///
+ /// Detection is based on the base path ending with `.hoodie/metadata`.
+ pub fn is_metadata_table(&self) -> bool {
+ let base_path: String = self
+ .hudi_configs
+ .get_or_default(HudiTableConfig::BasePath)
+ .into();
+ crate::util::path::is_metadata_table_path(&base_path)
+ }
+
+ /// Reads a metadata table file slice and returns merged
FilesPartitionRecords.
+ ///
+ /// This method is specifically designed for reading the metadata table's
`files` partition,
+ /// which uses HFile format for base files and HFile blocks in log files.
+ ///
+ /// # Arguments
+ /// * `base_file_path` - Relative path to the HFile base file
+ /// * `log_file_paths` - Relative paths to log files
+ ///
+ /// # Returns
+ /// A HashMap mapping record keys (partition paths or
"__all_partitions__") to
+ /// merged `FilesPartitionRecord`s containing file information.
+ ///
+ /// # Example
+ /// ```ignore
+ /// let reader = FileGroupReader::new_with_options(mdt_base_uri, options)?;
+ /// let merged = reader.read_file_slice_from_mdt_paths(
+ /// "files/files-0000-0_0-0-0_00000000000000.hfile",
+ /// vec!["files/.files-0000-0_20240101120000000.log.1_0-100-200"],
+ /// ).await?;
+ ///
+ /// // Get file listing for a partition
+ /// if let Some(record) = merged.get("city=chennai") {
+ /// for file_name in record.active_file_names() {
+ /// println!("Active file: {}", file_name);
+ /// }
+ /// }
+ /// ```
+ pub async fn read_file_slice_from_mdt_paths<I, S>(
+ &self,
+ base_file_path: &str,
+ log_file_paths: I,
+ ) -> Result<HashMap<String, FilesPartitionRecord>>
+ where
+ I: IntoIterator<Item = S>,
+ S: AsRef<str>,
+ {
+ let log_file_paths: Vec<String> = log_file_paths
+ .into_iter()
+ .map(|s| s.as_ref().to_string())
+ .collect();
+
+ // Read HFile base file using async open
+ let mut hfile_reader = HFileReader::open(&self.storage, base_file_path)
+ .await
+ .map_err(|e| {
+ ReadFileSliceError(format!(
+ "Failed to read MDT base file {}: {:?}",
+ base_file_path, e
+ ))
+ })?;
+
+ // Get Avro schema from HFile
+ let schema = hfile_reader
+ .get_avro_schema()
+ .map_err(|e| ReadFileSliceError(format!("Failed to get Avro
schema: {:?}", e)))?
+ .ok_or_else(|| ReadFileSliceError("No Avro schema found in
HFile".to_string()))?
+ .clone();
+
+ // Collect base records
+ let base_records = hfile_reader
+ .collect_records()
+ .map_err(|e| ReadFileSliceError(format!("Failed to collect HFile
records: {:?}", e)))?;
+
+ // Scan log files if present
+ let log_records = if log_file_paths.is_empty() {
+ vec![]
+ } else {
+ let instant_range = self.create_instant_range_for_log_file_scan();
+ let scan_result = LogFileScanner::new(self.hudi_configs.clone(),
self.storage.clone())
+ .scan(log_file_paths, &instant_range)
+ .await?;
+
+ match scan_result {
+ ScanResult::HFileRecords(records) => records,
+ ScanResult::Empty => vec![],
+ ScanResult::RecordBatches(_) => {
+ return Err(CoreError::LogBlockError(
+ "Unexpected RecordBatches in metadata table log
file".to_string(),
+ ));
+ }
+ }
+ };
+
+ // Merge base and log records
+ let merger = FilesPartitionMerger::new(schema);
+ merger.merge(&base_records, &log_records)
+ }
+
+ /// Same as [FileGroupReader::read_file_slice_from_mdt_paths], but
blocking.
+ pub fn read_file_slice_from_mdt_paths_blocking<I, S>(
+ &self,
+ base_file_path: &str,
+ log_file_paths: I,
+ ) -> Result<HashMap<String, FilesPartitionRecord>>
+ where
+ I: IntoIterator<Item = S>,
+ S: AsRef<str>,
+ {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(self.read_file_slice_from_mdt_paths(base_file_path,
log_file_paths))
+ }
+
+ /// Reads a metadata table file slice using a FileSlice object.
+ ///
+ /// Convenience wrapper around
[FileGroupReader::read_file_slice_from_mdt_paths].
+ pub async fn read_file_slice_from_mdt(
+ &self,
+ file_slice: &FileSlice,
+ ) -> Result<HashMap<String, FilesPartitionRecord>> {
+ let base_file_path = file_slice.base_file_relative_path()?;
+ let log_file_paths = if file_slice.has_log_file() {
+ file_slice
+ .log_files
+ .iter()
+ .map(|log_file| file_slice.log_file_relative_path(log_file))
+ .collect::<Result<Vec<String>>>()?
+ } else {
+ vec![]
+ };
+ self.read_file_slice_from_mdt_paths(&base_file_path, log_file_paths)
+ .await
+ }
+
+ /// Same as [FileGroupReader::read_file_slice_from_mdt], but blocking.
+ pub fn read_file_slice_from_mdt_blocking(
+ &self,
+ file_slice: &FileSlice,
+ ) -> Result<HashMap<String, FilesPartitionRecord>> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(self.read_file_slice_from_mdt(file_slice))
+ }
}
#[cfg(test)]
@@ -631,4 +787,197 @@ mod tests {
Ok(())
}
+
+ //
=========================================================================
+ // MDT File Slice Reading Tests
+ //
=========================================================================
+
+ fn get_metadata_table_base_uri() -> String {
+ use hudi_test::QuickstartTripsTable;
+ let table_path =
QuickstartTripsTable::V8Trips8I3U1D.path_to_mor_avro();
+ let mdt_path =
PathBuf::from(table_path).join(".hoodie").join("metadata");
+ let url =
Url::from_file_path(canonicalize(&mdt_path).unwrap()).unwrap();
+ url.as_ref().to_string()
+ }
+
+ /// Create a FileGroupReader for MDT without trying to resolve options
from storage.
+ fn create_mdt_reader() -> Result<FileGroupReader> {
+ let mdt_uri = get_metadata_table_base_uri();
+ let hudi_configs = Arc::new(HudiConfigs::new([(
+ HudiTableConfig::BasePath,
+ mdt_uri.as_str(),
+ )]));
+
FileGroupReader::new_with_configs_and_overwriting_options(hudi_configs,
empty_options())
+ }
+
+ #[test]
+ fn test_is_metadata_table_detection() -> Result<()> {
+ // Regular table should return false
+ let base_uri = get_base_uri_with_valid_props();
+ let reader = FileGroupReader::new_with_options(&base_uri,
empty_options())?;
+ assert!(!reader.is_metadata_table());
+
+ // Metadata table should return true
+ let mdt_reader = create_mdt_reader()?;
+ assert!(mdt_reader.is_metadata_table());
+
+ Ok(())
+ }
+
+ /// Initial HFile base file for the files partition (all zeros timestamp).
+ const MDT_FILES_BASE_FILE: &str =
"files/files-0000-0_0-955-2690_00000000000000000.hfile";
+
+ /// Log files for the V8Trips8I3U1D test table's files partition.
+ const MDT_FILES_LOG_FILES: &[&str] = &[
+ "files/.files-0000-0_20251220210108078.log.1_10-999-2838",
+ "files/.files-0000-0_20251220210123755.log.1_3-1032-2950",
+ "files/.files-0000-0_20251220210125441.log.1_5-1057-3024",
+ "files/.files-0000-0_20251220210127080.log.1_3-1082-3100",
+ "files/.files-0000-0_20251220210128625.log.1_5-1107-3174",
+ "files/.files-0000-0_20251220210129235.log.1_3-1118-3220",
+ "files/.files-0000-0_20251220210130911.log.1_3-1149-3338",
+ ];
+
+ #[test]
+ fn test_read_file_slice_from_mdt_paths_without_log_files() -> Result<()> {
+ use crate::metadata::table_record::MetadataRecordType;
+
+ let reader = create_mdt_reader()?;
+
+ // Read base file only (no log files)
+ let log_files: Vec<&str> = vec![];
+ let merged =
+
reader.read_file_slice_from_mdt_paths_blocking(MDT_FILES_BASE_FILE, log_files)?;
+
+ // Initial base file only has __all_partitions__ record
+ // City partition records are added through log files
+ assert_eq!(merged.len(), 1, "Base file should have 1 key");
+ assert!(merged.contains_key("__all_partitions__"));
+
+ // Validate __all_partitions__ record
+ let all_parts = merged.get("__all_partitions__").unwrap();
+ assert_eq!(all_parts.record_type, MetadataRecordType::AllPartitions);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_read_file_slice_from_mdt_paths_with_log_files() -> Result<()> {
+ use crate::metadata::table_record::MetadataRecordType;
+
+ let reader = create_mdt_reader()?;
+
+ // Read base file + all log files
+ let merged = reader.read_file_slice_from_mdt_paths_blocking(
+ MDT_FILES_BASE_FILE,
+ MDT_FILES_LOG_FILES.to_vec(),
+ )?;
+
+ // Should still have 4 keys after merging
+ assert_eq!(merged.len(), 4, "Should have 4 partition keys after
merge");
+
+ // Validate all partition keys have correct record types
+ for (key, record) in &merged {
+ if key == "__all_partitions__" {
+ assert_eq!(record.record_type,
MetadataRecordType::AllPartitions);
+ } else {
+ assert_eq!(record.record_type, MetadataRecordType::Files);
+ }
+ }
+
+ // Expected UUIDs for each partition's files
+ const CHENNAI_UUID: &str = "6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc";
+ const SAN_FRANCISCO_UUID: &str =
"036ded81-9ed4-479f-bcea-7145dfa0079b";
+ const SAO_PAULO_UUID: &str = "8aa68f7e-afd6-4c94-b86c-8a886552e08d";
+
+ // Validate chennai partition
+ let chennai = merged.get("city=chennai").unwrap();
+ let active_files = chennai.active_file_names();
+ assert!(
+ active_files.len() >= 2,
+ "Chennai should have at least 2 active files, got {}",
+ active_files.len()
+ );
+ assert!(chennai.total_size() > 0, "Total size should be > 0");
+ for file_name in &active_files {
+ assert!(
+ file_name.contains(CHENNAI_UUID),
+ "Chennai file should contain UUID: {}",
+ file_name
+ );
+ }
+
+ // Validate san_francisco partition
+ let sf = merged.get("city=san_francisco").unwrap();
+ for file_name in sf.active_file_names() {
+ assert!(
+ file_name.contains(SAN_FRANCISCO_UUID),
+ "San Francisco file should contain UUID: {}",
+ file_name
+ );
+ }
+
+ // Validate sao_paulo partition
+ let sp = merged.get("city=sao_paulo").unwrap();
+ for file_name in sp.active_file_names() {
+ assert!(
+ file_name.contains(SAO_PAULO_UUID),
+ "Sao Paulo file should contain UUID: {}",
+ file_name
+ );
+ }
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_read_file_slice_from_mdt_error_handling() -> Result<()> {
+ let reader = create_mdt_reader()?;
+
+ // Test with non-existent base file
+ let result = reader
+
.read_file_slice_from_mdt_paths_blocking("files/nonexistent.hfile",
Vec::<&str>::new());
+
+ assert!(result.is_err(), "Should error on non-existent file");
+ let err = result.unwrap_err().to_string();
+ assert!(
+ err.contains("Failed to read MDT base file"),
+ "Error should mention MDT base file: {}",
+ err
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_read_file_slice_from_mdt_blocking() -> Result<()> {
+ use crate::file_group::FileGroup;
+
+ let reader = create_mdt_reader()?;
+
+ // Build FileGroup using the API
+ let mut fg = FileGroup::new("files-0000-0".to_string(),
"files".to_string());
+ let base_file_name =
MDT_FILES_BASE_FILE.strip_prefix("files/").unwrap();
+ fg.add_base_file_from_name(base_file_name)?;
+ let log_file_names: Vec<_> = MDT_FILES_LOG_FILES
+ .iter()
+ .map(|s| s.strip_prefix("files/").unwrap())
+ .collect();
+ fg.add_log_files_from_names(log_file_names)?;
+
+ let file_slice = fg
+ .get_file_slice_as_of("99999999999999999")
+ .expect("Should have file slice");
+
+ let merged = reader.read_file_slice_from_mdt_blocking(file_slice)?;
+
+ // Should have 4 keys: __all_partitions__ + 3 city partitions
+ assert_eq!(merged.len(), 4);
+ assert!(merged.contains_key("__all_partitions__"));
+ assert!(merged.contains_key("city=chennai"));
+ assert!(merged.contains_key("city=san_francisco"));
+ assert!(merged.contains_key("city=sao_paulo"));
+
+ Ok(())
+ }
}
diff --git a/crates/core/src/hfile/reader.rs b/crates/core/src/hfile/reader.rs
index ac06d38..46a6846 100644
--- a/crates/core/src/hfile/reader.rs
+++ b/crates/core/src/hfile/reader.rs
@@ -30,6 +30,7 @@ use crate::hfile::key::{compare_keys, Key, KeyValue, Utf8Key};
use crate::hfile::proto::InfoProto;
use crate::hfile::record::HFileRecord;
use crate::hfile::trailer::HFileTrailer;
+use crate::storage::Storage;
use apache_avro::Schema as AvroSchema;
use prost::Message;
use std::cell::OnceCell;
@@ -132,6 +133,29 @@ impl HFileReader {
Ok(reader)
}
+ /// Open an HFile from storage.
+ ///
+ /// This is an async factory method that reads the file from storage
+ /// and creates an HFileReader.
+ ///
+ /// # Arguments
+ /// * `storage` - The storage to read from
+ /// * `relative_path` - The relative path to the HFile
+ ///
+ /// # Example
+ /// ```ignore
+ /// let reader = HFileReader::open(&storage, "files/data.hfile").await?;
+ /// for record in reader.iter()? {
+ /// println!("{:?}", record?);
+ /// }
+ /// ```
+ pub async fn open(storage: &Storage, relative_path: &str) -> Result<Self> {
+ let bytes = storage.get_file_data(relative_path).await.map_err(|e| {
+ HFileError::InvalidFormat(format!("Failed to read HFile {}: {:?}",
relative_path, e))
+ })?;
+ Self::new(bytes.to_vec())
+ }
+
/// Initialize metadata by reading index blocks and file info.
fn initialize_metadata(&mut self) -> Result<()> {
// Read the "load-on-open" section starting from
load_on_open_data_offset
@@ -2269,4 +2293,245 @@ mod tests {
);
assert!(total_size > 0, "Total size across partitions should be > 0");
}
+
+ // ================== File Info and Meta Block Tests ==================
+
+ #[test]
+ fn test_get_file_info_last_key() {
+ let bytes =
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // LASTKEY should be present in file info
+ let last_key = reader.get_file_info("hfile.LASTKEY");
+ assert!(last_key.is_some(), "LASTKEY should be present");
+
+ // Parse the last key - it's the structured key bytes
+ let last_key_bytes = last_key.unwrap();
+ assert!(!last_key_bytes.is_empty(), "LASTKEY should not be empty");
+ }
+
+ #[test]
+ fn test_get_file_info_not_found() {
+ let bytes =
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Non-existent key should return None
+ let result = reader.get_file_info("nonexistent.key");
+ assert!(result.is_none());
+ }
+
+ #[test]
+ fn test_get_avro_schema_from_metadata_hfile() {
+ let bytes = read_metadata_table_hfile();
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Metadata table HFiles should have embedded Avro schema
+ let schema = reader.get_avro_schema().expect("Failed to get schema");
+ assert!(schema.is_some(), "Metadata HFile should have Avro schema");
+
+ let avro_schema = schema.unwrap();
+ // Schema should be a record type for HoodieMetadataRecord
+ assert!(
+ matches!(avro_schema, AvroSchema::Record(_)),
+ "Schema should be a record type"
+ );
+ }
+
+ #[test]
+ fn test_get_avro_schema_regular_hfile() {
+ // Regular test HFiles don't have Avro schema in file info
+ let bytes =
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ let schema = reader.get_avro_schema().expect("Failed to get schema");
+ // Regular HFiles typically don't have embedded Avro schema
+ assert!(
+ schema.is_none(),
+ "Regular HFile should not have Avro schema"
+ );
+ }
+
+ #[test]
+ fn test_read_min_max_record_keys_from_metadata_hfile() {
+ let bytes = read_metadata_table_hfile();
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Metadata table HFiles should have min/max record keys
+ let result = reader.read_min_max_record_keys();
+
+ // The metadata HFile may or may not have these keys depending on how
it was created
+ // If present, verify the structure
+ if let Some((min_key, max_key)) = result {
+ assert!(!min_key.is_empty(), "Min key should not be empty");
+ assert!(!max_key.is_empty(), "Max key should not be empty");
+ // Min should be <= Max lexicographically
+ assert!(
+ min_key <= max_key,
+ "Min key should be <= Max key: {} vs {}",
+ min_key,
+ max_key
+ );
+ }
+ }
+
+ #[test]
+ fn test_read_min_max_record_keys_regular_hfile() {
+ let bytes =
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let reader = HFileReader::new(bytes).expect("Failed to create reader");
+
+ // Regular test HFiles typically don't have min/max record keys
+ let result = reader.read_min_max_record_keys();
+ assert!(
+ result.is_none(),
+ "Regular HFile should not have min/max record keys"
+ );
+ }
+
+ // ================== Error Handling Tests ==================
+
+ #[test]
+ fn test_invalid_hfile_too_small() {
+ // File too small to contain a valid trailer
+ let bytes = vec![0u8; 10];
+ let result = HFileReader::new(bytes);
+ assert!(result.is_err(), "Should fail for file too small");
+ }
+
+ #[test]
+ fn test_invalid_hfile_bad_magic() {
+ // Create a file with wrong magic bytes at the end
+ let mut bytes = vec![0u8; 100];
+ // HFile trailer magic is at the end - put garbage there
+ bytes[96..100].copy_from_slice(b"BAAD");
+ let result = HFileReader::new(bytes);
+ assert!(result.is_err(), "Should fail for invalid magic");
+ }
+
+ // ================== Multi-Block Iteration Tests ==================
+
+ #[test]
+ fn test_iterate_across_multiple_blocks() {
+ // Use GZIP file with 20000 entries - spans multiple blocks
+ let bytes =
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create
reader");
+
+ let mut count = 0;
+ let mut prev_key: Option<String> = None;
+
+ for result in reader.iter().expect("Failed to create iterator") {
+ let kv = result.expect("Failed to read kv");
+ let key = kv.key().content_as_str().unwrap().to_string();
+
+ // Verify keys are in ascending order
+ if let Some(ref prev) = prev_key {
+ assert!(
+ key > *prev,
+ "Keys should be in ascending order: {} > {}",
+ key,
+ prev
+ );
+ }
+ prev_key = Some(key);
+ count += 1;
+ }
+
+ assert_eq!(count, 20000, "Should iterate all 20000 entries");
+ }
+
+ #[test]
+ fn test_seek_across_block_boundaries() {
+ // Use 512KB blocks with 20000 entries
+ let bytes =
read_test_hfile("hudi_1_0_hbase_2_4_9_512KB_GZ_20000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create
reader");
+
+ reader.seek_to_first().expect("Failed to seek");
+
+ // Seek to various keys that likely span different blocks
+ let test_keys = [
+ "hudi-key-000000000", // First key
+ "hudi-key-000005000", // Middle
+ "hudi-key-000010000", // Another block
+ "hudi-key-000015000", // Another block
+ "hudi-key-000019999", // Last key
+ ];
+
+ for expected_key in test_keys {
+ let lookup = Utf8Key::new(expected_key);
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(
+ result,
+ SeekResult::Found,
+ "Should find key: {}",
+ expected_key
+ );
+
+ let kv = reader.get_key_value().expect("Failed to get
kv").unwrap();
+ assert_eq!(
+ kv.key().content_as_str().unwrap(),
+ expected_key,
+ "Key mismatch"
+ );
+ }
+ }
+
+ #[test]
+ fn test_next_at_eof() {
+ let bytes =
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_NONE_5000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create
reader");
+
+ // Seek to last key
+ reader.seek_to_first().expect("Failed to seek");
+ let lookup = Utf8Key::new("hudi-key-000004999");
+ let result = reader.seek_to(&lookup).expect("Failed to seek");
+ assert_eq!(result, SeekResult::Found);
+
+ // next() should return false at EOF
+ assert!(!reader.next().expect("Failed to next"));
+
+ // get_key_value should return None after EOF
+ assert!(reader.get_key_value().expect("Failed to get kv").is_none());
+ }
+
+ #[test]
+ fn test_collect_records_gzip() {
+ let bytes =
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create
reader");
+
+ let records = reader.collect_records().expect("Failed to collect
records");
+ assert_eq!(records.len(), 20000);
+
+ // Verify first and last records
+ assert_eq!(records[0].key_as_str(), Some("hudi-key-000000000"));
+ assert_eq!(records[19999].key_as_str(), Some("hudi-key-000019999"));
+ }
+
+ #[test]
+ fn test_lookup_records_across_blocks() {
+ let bytes =
read_test_hfile("hudi_1_0_hbase_2_4_9_16KB_GZ_20000.hfile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create
reader");
+
+ // Look up keys that span different blocks
+ let keys = vec![
+ "hudi-key-000000000",
+ "hudi-key-000005000",
+ "hudi-key-000010000",
+ "hudi-key-000015000",
+ "hudi-key-000019999",
+ "hudi-key-nonexistent",
+ ];
+
+ let results = reader.lookup_records(&keys).expect("Failed to lookup");
+ assert_eq!(results.len(), 6);
+
+ // First 5 should be found
+ for (key, value) in results.iter().take(5) {
+ assert!(value.is_some(), "Key {} should be found", key);
+ }
+
+ // Last one should not be found
+ assert!(
+ results[5].1.is_none(),
+ "Nonexistent key should not be found"
+ );
+ }
}
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs
index e1c326b..6fe247c 100644
--- a/crates/core/src/metadata/mod.rs
+++ b/crates/core/src/metadata/mod.rs
@@ -31,3 +31,6 @@ pub const LAKE_FORMAT_METADATA_DIRS: &[&str; 3] = &[
DELTALAKE_METADATA_DIR,
ICEBERG_METADATA_DIR,
];
+
+/// The virtual partition field name used in metadata tables.
+pub const MDT_PARTITION_FIELD: &str = "partition";
diff --git a/crates/core/src/metadata/table_record.rs
b/crates/core/src/metadata/table_record.rs
index cabb393..400116e 100644
--- a/crates/core/src/metadata/table_record.rs
+++ b/crates/core/src/metadata/table_record.rs
@@ -121,6 +121,9 @@ pub struct FilesPartitionRecord {
}
impl FilesPartitionRecord {
+ /// The partition name in the metadata table that stores file listings.
+ pub const PARTITION_NAME: &'static str = "files";
+
/// Check if this is an ALL_PARTITIONS record.
pub fn is_all_partitions(&self) -> bool {
self.record_type == MetadataRecordType::AllPartitions
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index deb5963..d1f4d35 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -106,6 +106,7 @@ impl Storage {
}
#[cfg(test)]
+ /// Get basic file metadata (name, size) without loading the file content.
async fn get_file_metadata_not_populated(&self, relative_path: &str) ->
Result<FileMetadata> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
@@ -116,6 +117,7 @@ impl Storage {
Ok(FileMetadata::new(name.to_string(), meta.size))
}
+ /// Get full file metadata for a Parquet file, including record counts
from Parquet metadata.
pub async fn get_file_metadata(&self, relative_path: &str) ->
Result<FileMetadata> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 7081a64..2c72a82 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -94,13 +94,17 @@ pub mod partition;
mod validation;
use crate::config::read::HudiReadConfig;
-use crate::config::table::HudiTableConfig::PartitionFields;
+use crate::config::table::HudiTableConfig::{MetadataTablePartitions,
PartitionFields};
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
+use crate::error::CoreError;
use crate::expr::filter::{from_str_tuples, Filter};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::reader::FileGroupReader;
+use crate::metadata::table_record::FilesPartitionRecord;
+use crate::metadata::MDT_PARTITION_FIELD;
use crate::schema::resolver::{resolve_avro_schema, resolve_schema};
+use crate::storage::util::join_url_segments;
use crate::table::builder::TableBuilder;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
@@ -213,6 +217,64 @@ impl Table {
self.table_type() == TableTypeValue::MergeOnRead.as_ref()
}
+ /// Check if this table is a metadata table.
+ ///
+ /// Detection is based on the base path ending with `.hoodie/metadata`.
+ pub fn is_metadata_table(&self) -> bool {
+ let base_path: String = self
+ .hudi_configs
+ .get_or_default(HudiTableConfig::BasePath)
+ .into();
+ crate::util::path::is_metadata_table_path(&base_path)
+ }
+
+ /// Get the list of available metadata table partitions for this table.
+ ///
+ /// Returns the partitions configured in
`hoodie.table.metadata.partitions`.
+ pub fn get_metadata_table_partitions(&self) -> Vec<String> {
+ self.hudi_configs
+ .get_or_default(MetadataTablePartitions)
+ .into()
+ }
+
+ /// Create a metadata table instance for this data table.
+ ///
+ /// Uses all partitions from `hoodie.table.metadata.partitions`
configuration.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if the metadata table cannot be created or if there
are
+ /// no metadata table partitions configured.
+ pub async fn new_metadata_table(&self) -> Result<Table> {
+ if self.is_metadata_table() {
+ return Err(CoreError::MetadataTable(
+ "Cannot create metadata table from another metadata
table".to_string(),
+ ));
+ }
+
+ let mdt_partitions = self.get_metadata_table_partitions();
+ if mdt_partitions.is_empty() {
+ return Err(CoreError::MetadataTable(
+ "No metadata table partitions configured".to_string(),
+ ));
+ }
+
+ let mdt_url = join_url_segments(&self.base_url(), &[".hoodie",
"metadata"])?;
+ Table::new_with_options(
+ mdt_url.as_str(),
+ [(PartitionFields.as_ref(), MDT_PARTITION_FIELD)],
+ )
+ .await
+ }
+
+ /// Same as [Table::new_metadata_table], but blocking.
+ pub fn new_metadata_table_blocking(&self) -> Result<Table> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { self.new_metadata_table().await })
+ }
+
pub fn timezone(&self) -> String {
self.hudi_configs
.get_or_default(HudiTableConfig::TimelineTimezone)
@@ -260,7 +322,22 @@ impl Table {
}
/// Get the latest partition [arrow_schema::Schema] of the table.
+ ///
+ /// For metadata tables, returns a schema with a single `partition` field
+ /// typed as [arrow_schema::DataType::Utf8], since MDT uses a single
partition
+ /// column to identify partitions like "files", "column_stats", etc.
+ ///
+ /// For regular tables, returns the partition fields with their actual
data types
+ /// derived from the table schema.
pub async fn get_partition_schema(&self) -> Result<Schema> {
+ if self.is_metadata_table() {
+ return Ok(Schema::new(vec![Field::new(
+ MDT_PARTITION_FIELD,
+ arrow_schema::DataType::Utf8,
+ false,
+ )]));
+ }
+
let partition_fields: HashSet<String> = {
let fields: Vec<String> =
self.hudi_configs.get_or_default(PartitionFields).into();
fields.into_iter().collect()
@@ -609,6 +686,56 @@ impl Table {
)
}
+ /// Read records from the "files" partition of the metadata table.
+ ///
+ /// This method can only be called on metadata tables. It reads all records
+ /// from the "files" partition and returns merged `FilesPartitionRecord`s.
+ ///
+ /// # Returns
+ /// A HashMap mapping record keys to their `FilesPartitionRecord`s.
+ pub async fn read_metadata_files(&self) -> Result<HashMap<String,
FilesPartitionRecord>> {
+ if !self.is_metadata_table() {
+ return Err(CoreError::MetadataTable(
+ "read_metadata_files can only be called on metadata
tables".to_string(),
+ ));
+ }
+
+ let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() else {
+ return Ok(HashMap::new());
+ };
+
+ let filters = from_str_tuples([(
+ MDT_PARTITION_FIELD,
+ "=",
+ FilesPartitionRecord::PARTITION_NAME,
+ )])?;
+ let file_slices = self.get_file_slices_internal(timestamp,
&filters).await?;
+
+ if file_slices.len() != 1 {
+ return Err(CoreError::MetadataTable(format!(
+ "Expected 1 file slice for {} partition, got {}",
+ FilesPartitionRecord::PARTITION_NAME,
+ file_slices.len()
+ )));
+ }
+ let file_slice = &file_slices[0];
+
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp,
+ )])?;
+
+ fg_reader.read_file_slice_from_mdt(file_slice).await
+ }
+
+ /// Same as [Table::read_metadata_files], but blocking.
+ pub fn read_metadata_files_blocking(&self) -> Result<HashMap<String,
FilesPartitionRecord>> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(self.read_metadata_files())
+ }
+
/// Get all the latest records in the table.
///
/// # Arguments
@@ -770,6 +897,7 @@ mod tests {
use crate::config::HUDI_CONF_DIR;
use crate::error::CoreError;
use crate::metadata::meta_field::MetaField;
+ use crate::metadata::table_record::MetadataRecordType;
use crate::storage::util::join_url_segments;
use crate::storage::Storage;
use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq,
SampleTable};
@@ -1017,8 +1145,10 @@ mod tests {
assert_eq!(actual, "default");
let actual: bool = configs.get_or_default(DropsPartitionFields).into();
assert!(!actual);
- assert!(panic::catch_unwind(||
configs.get_or_default(IsHiveStylePartitioning)).is_err());
- assert!(panic::catch_unwind(||
configs.get_or_default(IsPartitionPathUrlencoded)).is_err());
+ let actual: bool =
configs.get_or_default(IsHiveStylePartitioning).into();
+ assert!(!actual);
+ let actual: bool =
configs.get_or_default(IsPartitionPathUrlencoded).into();
+ assert!(!actual);
assert!(panic::catch_unwind(||
configs.get_or_default(KeyGeneratorClass)).is_err());
let actual: Vec<String> =
configs.get_or_default(PartitionFields).into();
assert!(actual.is_empty());
@@ -1440,4 +1570,87 @@ mod tests {
let expected = HashSet::new();
assert_eq!(actual, expected);
}
+
+ //
=========================================================================
+ // Metadata Table Tests
+ //
=========================================================================
+
+ fn get_data_table() -> Table {
+ use hudi_test::QuickstartTripsTable;
+ let table_path =
QuickstartTripsTable::V8Trips8I3U1D.path_to_mor_avro();
+ Table::new_blocking(&table_path).unwrap()
+ }
+
+ #[test]
+ fn hudi_table_read_metadata_files() {
+ let data_table = get_data_table();
+ let metadata_table = data_table.new_metadata_table_blocking().unwrap();
+
+ assert!(metadata_table.is_metadata_table());
+
+ let records = metadata_table.read_metadata_files_blocking().unwrap();
+
+ // Should have 4 records: __all_partitions__ + 3 city partitions
+ assert_eq!(records.len(), 4);
+
+ // Validate __all_partitions__ record
+ let all_partitions = records.get("__all_partitions__").unwrap();
+ assert_eq!(
+ all_partitions.record_type,
+ MetadataRecordType::AllPartitions
+ );
+ let partition_names: HashSet<_> =
all_partitions.partition_names().into_iter().collect();
+ assert_eq!(
+ partition_names,
+ HashSet::from(["city=chennai", "city=san_francisco",
"city=sao_paulo"])
+ );
+
+ // Validate city=chennai record with actual file names
+ let chennai = records.get("city=chennai").unwrap();
+ assert_eq!(chennai.record_type, MetadataRecordType::Files);
+ let chennai_files: HashSet<_> =
chennai.active_file_names().into_iter().collect();
+ assert_eq!(
+ chennai_files,
+ HashSet::from([
+
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_2-986-2794_20251220210108078.parquet",
+
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_0-1112-3190_20251220210129235.parquet",
+
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210127080.log.1_0-1072-3078",
+
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210128625.log.1_0-1097-3150",
+ ])
+ );
+ assert!(chennai.total_size() > 0);
+ }
+
+ #[test]
+ fn hudi_table_get_metadata_table_partitions() {
+ let data_table = get_data_table();
+
+ // Verify we can get the MDT partitions from the data table
+ let partitions = data_table.get_metadata_table_partitions();
+
+ // The test table has 5 MDT partitions configured
+ assert_eq!(
+ partitions.len(),
+ 5,
+ "Should have 5 MDT partitions, got: {:?}",
+ partitions
+ );
+
+ // Verify all expected partitions are present
+ let expected = [
+ "column_stats",
+ "files",
+ "partition_stats",
+ "record_index",
+ "secondary_index_rider_idx",
+ ];
+ for partition in &expected {
+ assert!(
+ partitions.contains(&partition.to_string()),
+ "Should contain '{}' partition, got: {:?}",
+ partition,
+ partitions
+ );
+ }
+ }
}
diff --git a/crates/core/src/table/validation.rs
b/crates/core/src/table/validation.rs
index 3a9d161..381b7ea 100644
--- a/crates/core/src/table/validation.rs
+++ b/crates/core/src/table/validation.rs
@@ -18,13 +18,16 @@
*/
use crate::config::internal::HudiInternalConfig::SkipConfigValidation;
use crate::config::read::HudiReadConfig;
+use crate::config::table::BaseFileFormatValue;
use crate::config::table::HudiTableConfig;
use crate::config::table::HudiTableConfig::{
- DropsPartitionFields, TableVersion, TimelineLayoutVersion,
+ BaseFileFormat, BasePath, DropsPartitionFields, TableVersion,
TimelineLayoutVersion,
};
use crate::config::HudiConfigs;
use crate::error::CoreError;
use crate::merge::record_merger::RecordMerger;
+use crate::util::path::is_metadata_table_path;
+use std::str::FromStr;
use strum::IntoEnumIterator;
pub fn validate_configs(hudi_configs: &HudiConfigs) ->
crate::error::Result<()> {
@@ -68,6 +71,20 @@ pub fn validate_configs(hudi_configs: &HudiConfigs) ->
crate::error::Result<()>
)));
}
+ // Validate HFile format is only used for metadata tables
+ if let Ok(base_file_format_str) = hudi_configs.get(BaseFileFormat) {
+ let format_str: String = base_file_format_str.into();
+ if let Ok(BaseFileFormatValue::HFile) =
BaseFileFormatValue::from_str(&format_str) {
+ let base_path: String =
hudi_configs.get_or_default(BasePath).into();
+ if !is_metadata_table_path(&base_path) {
+ return Err(CoreError::Unsupported(format!(
+ "Base file format '{}' is only valid for metadata tables",
+ format_str
+ )));
+ }
+ }
+ }
+
RecordMerger::validate_configs(hudi_configs)?;
Ok(())
@@ -182,4 +199,56 @@ mod tests {
panic!("Expected CoreError::Unsupported for v6 with layout 2");
}
}
+
+ #[test]
+ fn test_hfile_format_rejected_for_regular_table() {
+ use crate::config::table::HudiTableConfig::BaseFileFormat;
+
+ let mut options = HashMap::new();
+ options.insert(TableName.as_ref().to_string(),
"test_table".to_string());
+ options.insert(TableType.as_ref().to_string(),
"MERGE_ON_READ".to_string());
+ options.insert(TableVersion.as_ref().to_string(), "8".to_string());
+ options.insert(TimelineLayoutVersion.as_ref().to_string(),
"2".to_string());
+ options.insert(BaseFileFormat.as_ref().to_string(),
"hfile".to_string());
+ options.insert("hoodie.base.path".to_string(),
"/data/my_table".to_string());
+
+ let configs = HudiConfigs::new(options);
+ let result = validate_configs(&configs);
+
+ assert!(result.is_err());
+ if let Err(CoreError::Unsupported(msg)) = result {
+ assert!(
+ msg.contains("only valid for metadata tables"),
+ "Unexpected message: {}",
+ msg
+ );
+ } else {
+ panic!("Expected CoreError::Unsupported for HFile on regular
table");
+ }
+ }
+
+ #[test]
+ fn test_hfile_format_allowed_for_metadata_table() {
+ use crate::config::table::HudiTableConfig::BaseFileFormat;
+
+ let mut options = HashMap::new();
+ options.insert(TableName.as_ref().to_string(), "metadata".to_string());
+ options.insert(TableType.as_ref().to_string(),
"MERGE_ON_READ".to_string());
+ options.insert(TableVersion.as_ref().to_string(), "8".to_string());
+ options.insert(TimelineLayoutVersion.as_ref().to_string(),
"2".to_string());
+ options.insert(BaseFileFormat.as_ref().to_string(),
"hfile".to_string());
+ options.insert(
+ "hoodie.base.path".to_string(),
+ "/data/my_table/.hoodie/metadata".to_string(),
+ );
+
+ let configs = HudiConfigs::new(options);
+ let result = validate_configs(&configs);
+
+ assert!(
+ result.is_ok(),
+ "HFile format should be allowed for metadata table: {:?}",
+ result
+ );
+ }
}
diff --git a/crates/core/src/timeline/instant.rs
b/crates/core/src/timeline/instant.rs
index 1cd29af..4138fbe 100644
--- a/crates/core/src/timeline/instant.rs
+++ b/crates/core/src/timeline/instant.rs
@@ -194,9 +194,35 @@ impl Instant {
Ok(())
}
+ /// Parse a timestamp string into a UTC DateTime.
+ ///
+ /// Supports two formats:
+ /// 1. Date format: `yyyyMMddHHmmss` (14 chars) or `yyyyMMddHHmmssSSS` (17
chars)
+ /// 2. Epoch milliseconds: 17-digit number representing milliseconds since
Unix epoch
+ /// (used by metadata table for timestamps like `00000000000000000`)
+ ///
+ /// The function tries date format first, then falls back to epoch
milliseconds
+ /// if the date parsing fails (e.g., invalid month/day like `00`).
pub fn parse_datetime(timestamp: &str, timezone: &str) ->
Result<DateTime<Utc>> {
- let naive_dt = Self::parse_naive_datetime(timestamp)?;
- Self::convert_to_timezone(naive_dt, timezone)
+ // First try parsing as yyyyMMddHHmmssSSS date format
+ if let Ok(naive_dt) = Self::parse_naive_datetime(timestamp) {
+ return Self::convert_to_timezone(naive_dt, timezone);
+ }
+
+ // Fallback: treat as epoch milliseconds (zero-padded 17-digit number)
+ // This handles metadata table timestamps like 00000000000000000,
00000000000000001, etc.
+ if timestamp.len() == 17 && timestamp.chars().all(|c|
c.is_ascii_digit()) {
+ let epoch_ms: i64 = timestamp
+ .parse()
+ .map_err(|e| CoreError::Timeline(format!("Invalid epoch
timestamp: {}", e)))?;
+ return DateTime::from_timestamp_millis(epoch_ms)
+ .ok_or_else(|| CoreError::Timeline("Invalid epoch
millis".to_string()));
+ }
+
+ Err(CoreError::Timeline(format!(
+ "Cannot parse timestamp '{}': not a valid date format or epoch
millis",
+ timestamp
+ )))
}
fn parse_naive_datetime(timestamp: &str) -> Result<NaiveDateTime> {
@@ -538,4 +564,53 @@ mod tests {
let result = Instant::from_str("202403151425301.commit");
assert!(result.is_err());
}
+
+ #[test]
+ fn test_parse_datetime_epoch_millis() -> Result<()> {
+ // Epoch 0 (1970-01-01 00:00:00.000 UTC)
+ let dt = Instant::parse_datetime("00000000000000000", "UTC")?;
+ assert_eq!(dt.timestamp_millis(), 0);
+
+ // Epoch 1ms
+ let dt = Instant::parse_datetime("00000000000000001", "UTC")?;
+ assert_eq!(dt.timestamp_millis(), 1);
+
+ // Epoch 1000ms (1 second)
+ let dt = Instant::parse_datetime("00000000000001000", "UTC")?;
+ assert_eq!(dt.timestamp_millis(), 1000);
+
+ // A larger epoch value
+ let dt = Instant::parse_datetime("00001734567890123", "UTC")?;
+ assert_eq!(dt.timestamp_millis(), 1734567890123);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_parse_datetime_epoch_ordering() -> Result<()> {
+ // Verify ordering: epoch 0 < epoch 1 < epoch 2 < ... < real timestamp
+ let epoch_0 = Instant::parse_datetime("00000000000000000", "UTC")?;
+ let epoch_1 = Instant::parse_datetime("00000000000000001", "UTC")?;
+ let epoch_2 = Instant::parse_datetime("00000000000000002", "UTC")?;
+ let real_ts = Instant::parse_datetime("20240315142530500", "UTC")?;
+
+ assert!(epoch_0 < epoch_1);
+ assert!(epoch_1 < epoch_2);
+ assert!(epoch_2 < real_ts);
+
+ Ok(())
+ }
+
+ #[test]
+ fn test_parse_datetime_date_format_still_works() -> Result<()> {
+ // Valid date format should still parse correctly (not treated as
epoch)
+ let dt = Instant::parse_datetime("20240101120000000", "UTC")?;
+ // This is Jan 1, 2024, 12:00:00.000 UTC - NOT epoch 20240101120000000
+ assert_eq!(
+ dt.format("%Y-%m-%d %H:%M:%S").to_string(),
+ "2024-01-01 12:00:00"
+ );
+
+ Ok(())
+ }
}
diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs
index c9aa85b..744d992 100644
--- a/crates/core/src/util/mod.rs
+++ b/crates/core/src/util/mod.rs
@@ -18,3 +18,4 @@
*/
pub mod arrow;
pub mod collection;
+pub mod path;
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/util/path.rs
similarity index 51%
copy from crates/core/src/metadata/mod.rs
copy to crates/core/src/util/path.rs
index e1c326b..77a167b 100644
--- a/crates/core/src/metadata/mod.rs
+++ b/crates/core/src/util/path.rs
@@ -16,18 +16,25 @@
* specific language governing permissions and limitations
* under the License.
*/
-pub mod commit;
-pub mod merger;
-pub mod meta_field;
-pub mod replace_commit;
-pub mod table_record;
-pub const HUDI_METADATA_DIR: &str = ".hoodie";
-pub const DELTALAKE_METADATA_DIR: &str = "_delta_log";
-pub const ICEBERG_METADATA_DIR: &str = "metadata";
+/// Check if the given path is a metadata table path.
+///
+/// Detection is based on the path ending with `.hoodie/metadata`.
+pub fn is_metadata_table_path(path: &str) -> bool {
+ path.trim_end_matches('/').ends_with(".hoodie/metadata")
+}
-pub const LAKE_FORMAT_METADATA_DIRS: &[&str; 3] = &[
- HUDI_METADATA_DIR,
- DELTALAKE_METADATA_DIR,
- ICEBERG_METADATA_DIR,
-];
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_is_metadata_table_path() {
+ assert!(is_metadata_table_path("/data/my_table/.hoodie/metadata"));
+ assert!(is_metadata_table_path("/data/my_table/.hoodie/metadata/"));
+ assert!(is_metadata_table_path("s3://bucket/table/.hoodie/metadata"));
+ assert!(!is_metadata_table_path("/data/my_table"));
+ assert!(!is_metadata_table_path("/data/my_table/.hoodie"));
+ assert!(!is_metadata_table_path("/data/.hoodie/metadata/files"));
+ }
+}