This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 3e3a2cf refactor: add targeted MDT lookup APIs for partition pruning
(#502)
3e3a2cf is described below
commit 3e3a2cf11bb72b8d12d1271d56f82f7385304d78
Author: Shiyan Xu <[email protected]>
AuthorDate: Tue Dec 30 06:09:53 2025 -0600
refactor: add targeted MDT lookup APIs for partition pruning (#502)
---
crates/core/src/file_group/base_file.rs | 4 +-
crates/core/src/file_group/builder.rs | 165 +++++---
crates/core/src/file_group/log_file/content.rs | 2 +-
crates/core/src/file_group/log_file/mod.rs | 4 +-
crates/core/src/file_group/log_file/scanner.rs | 6 +-
crates/core/src/file_group/reader.rs | 303 ++++----------
crates/core/src/hfile/reader.rs | 14 +-
crates/core/src/hfile/record.rs | 10 +-
crates/core/src/metadata/merger.rs | 102 ++++-
crates/core/src/metadata/mod.rs | 3 +-
crates/core/src/metadata/table/mod.rs | 441 +++++++++++++++++++++
.../metadata/{table_record.rs => table/records.rs} | 73 +++-
crates/core/src/table/fs_view.rs | 205 +++++-----
crates/core/src/table/listing.rs | 22 +-
crates/core/src/table/mod.rs | 430 +-------------------
crates/core/src/timeline/completion_time.rs | 202 +---------
crates/core/src/timeline/mod.rs | 68 ++--
crates/core/src/timeline/view.rs | 312 +++++++++++++++
18 files changed, 1307 insertions(+), 1059 deletions(-)
diff --git a/crates/core/src/file_group/base_file.rs
b/crates/core/src/file_group/base_file.rs
index 258b837..389afe6 100644
--- a/crates/core/src/file_group/base_file.rs
+++ b/crates/core/src/file_group/base_file.rs
@@ -18,7 +18,7 @@
*/
use crate::error::CoreError;
use crate::storage::file_metadata::FileMetadata;
-use crate::timeline::completion_time::TimelineViewByCompletionTime;
+use crate::timeline::completion_time::CompletionTimeView;
use crate::Result;
use std::fmt::Display;
use std::str::FromStr;
@@ -106,7 +106,7 @@ impl BaseFile {
///
/// For v6 tables, the view returns `None` and this is a no-op.
/// For v8+ tables, this sets the completion timestamp for completed
commits.
- pub fn set_completion_time<V: TimelineViewByCompletionTime>(&mut self,
view: &V) {
+ pub fn set_completion_time<V: CompletionTimeView>(&mut self, view: &V) {
self.completion_timestamp = view
.get_completion_time(&self.commit_timestamp)
.map(|s| s.to_string());
diff --git a/crates/core/src/file_group/builder.rs
b/crates/core/src/file_group/builder.rs
index 2c3374b..3df044f 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -23,7 +23,7 @@ use crate::file_group::FileGroup;
use crate::metadata::commit::HoodieCommitMetadata;
use crate::metadata::replace_commit::HoodieReplaceCommitMetadata;
use crate::metadata::table_record::FilesPartitionRecord;
-use crate::timeline::completion_time::TimelineViewByCompletionTime;
+use crate::timeline::completion_time::CompletionTimeView;
use crate::Result;
use dashmap::DashMap;
use serde_json::{Map, Value};
@@ -64,12 +64,7 @@ impl FileGroupMerger for HashSet<FileGroup> {
///
/// * `commit_metadata` - The commit metadata JSON map
/// * `completion_time_view` - View to look up completion timestamps.
-/// - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for all
lookups)
-/// - For v8+ tables: Pass [`CompletionTimeView`] built from timeline
instants
-///
-/// [`V6CompletionTimeView`]:
crate::timeline::completion_time::V6CompletionTimeView
-/// [`CompletionTimeView`]:
crate::timeline::completion_time::CompletionTimeView
-pub fn file_groups_from_commit_metadata<V: TimelineViewByCompletionTime>(
+pub fn file_groups_from_commit_metadata<V: CompletionTimeView>(
commit_metadata: &Map<String, Value>,
completion_time_view: &V,
) -> Result<HashSet<FileGroup>> {
@@ -153,15 +148,10 @@ pub fn replaced_file_groups_from_replace_commit(
/// * `records` - Metadata table files partition records (partition_path ->
FilesPartitionRecord)
/// * `base_file_extension` - The base file format extension (e.g., "parquet",
"hfile")
/// * `completion_time_view` - View to look up completion timestamps.
-/// - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for all
lookups)
-/// - For v8+ tables: Pass [`CompletionTimeView`] built from timeline
instants
-///
-/// [`V6CompletionTimeView`]:
crate::timeline::completion_time::V6CompletionTimeView
-/// [`CompletionTimeView`]:
crate::timeline::completion_time::CompletionTimeView
///
/// # Returns
/// A map of partition paths to their FileGroups.
-pub fn file_groups_from_files_partition_records<V:
TimelineViewByCompletionTime>(
+pub fn file_groups_from_files_partition_records<V: CompletionTimeView>(
records: &HashMap<String, FilesPartitionRecord>,
base_file_extension: &str,
completion_time_view: &V,
@@ -190,7 +180,7 @@ pub fn file_groups_from_files_partition_records<V:
TimelineViewByCompletionTime>
})?;
log_file.set_completion_time(completion_time_view);
- // Filter uncommitted files for v8+ tables
+ // Filter uncommitted files for timeline layout v2
if completion_time_view.should_filter_uncommitted()
&& log_file.completion_timestamp.is_none()
{
@@ -211,7 +201,7 @@ pub fn file_groups_from_files_partition_records<V:
TimelineViewByCompletionTime>
})?;
base_file.set_completion_time(completion_time_view);
- // Filter uncommitted files for v8+ tables
+ // Filter uncommitted files for timeline layout v2
if completion_time_view.should_filter_uncommitted()
&& base_file.completion_timestamp.is_none()
{
@@ -285,8 +275,34 @@ mod tests {
mod test_file_groups_from_commit_metadata {
use super::super::*;
- use crate::timeline::completion_time::{CompletionTimeView,
V6CompletionTimeView};
+ use crate::config::HudiConfigs;
+ use crate::timeline::instant::{Action, Instant, State};
+ use crate::timeline::view::TimelineView;
use serde_json::{json, Map, Value};
+ use std::collections::HashSet;
+ use std::sync::Arc;
+
+ fn create_layout_v1_view() -> TimelineView {
+ let configs =
Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "1")]));
+ TimelineView::new(
+ "99999999999999999".to_string(),
+ None,
+ &[] as &[Instant],
+ HashSet::new(),
+ &configs,
+ )
+ }
+
+ fn create_layout_v2_view(instants: &[Instant]) -> TimelineView {
+ let configs =
Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "2")]));
+ TimelineView::new(
+ "99999999999999999".to_string(),
+ None,
+ instants,
+ HashSet::new(),
+ &configs,
+ )
+ }
#[test]
fn test_missing_partition_to_write_stats() {
@@ -298,8 +314,8 @@ mod tests {
.unwrap()
.clone();
- // Use V6CompletionTimeView for v6 table behavior (no completion
time tracking)
- let result = file_groups_from_commit_metadata(&metadata,
&V6CompletionTimeView::new());
+ // Use layout v1 view (no completion time tracking)
+ let result = file_groups_from_commit_metadata(&metadata,
&create_layout_v1_view());
// With the new implementation, this returns Ok with an empty
HashSet
// because iter_write_stats() returns an empty iterator when
partition_to_write_stats is None
assert!(result.is_ok());
@@ -317,7 +333,7 @@ mod tests {
.unwrap()
.clone();
- let result = file_groups_from_commit_metadata(&metadata,
&V6CompletionTimeView::new());
+ let result = file_groups_from_commit_metadata(&metadata,
&create_layout_v1_view());
assert!(matches!(
result,
Err(CoreError::CommitMetadata(msg)) if msg.contains("Failed to
parse commit metadata")
@@ -337,7 +353,7 @@ mod tests {
.unwrap()
.clone();
- let result = file_groups_from_commit_metadata(&metadata,
&V6CompletionTimeView::new());
+ let result = file_groups_from_commit_metadata(&metadata,
&create_layout_v1_view());
assert!(matches!(
result,
Err(CoreError::CommitMetadata(msg)) if msg == "Missing fileId
in write stats"
@@ -357,7 +373,7 @@ mod tests {
.unwrap()
.clone();
- let result = file_groups_from_commit_metadata(&metadata,
&V6CompletionTimeView::new());
+ let result = file_groups_from_commit_metadata(&metadata,
&create_layout_v1_view());
assert!(matches!(
result,
Err(CoreError::CommitMetadata(msg)) if msg == "Missing path in
write stats"
@@ -378,7 +394,7 @@ mod tests {
.unwrap()
.clone();
- let result = file_groups_from_commit_metadata(&metadata,
&V6CompletionTimeView::new());
+ let result = file_groups_from_commit_metadata(&metadata,
&create_layout_v1_view());
assert!(matches!(
result,
Err(CoreError::CommitMetadata(msg)) if msg == "Invalid file
name in path"
@@ -399,7 +415,7 @@ mod tests {
.unwrap()
.clone();
- let result = file_groups_from_commit_metadata(&metadata,
&V6CompletionTimeView::new());
+ let result = file_groups_from_commit_metadata(&metadata,
&create_layout_v1_view());
// Serde will fail to parse this and return a deserialization error
assert!(matches!(
result,
@@ -421,7 +437,7 @@ mod tests {
.unwrap()
.clone();
- let result = file_groups_from_commit_metadata(&metadata,
&V6CompletionTimeView::new());
+ let result = file_groups_from_commit_metadata(&metadata,
&create_layout_v1_view());
// Serde will fail to parse this and return a deserialization error
assert!(matches!(
result,
@@ -445,7 +461,7 @@ mod tests {
}"#;
let metadata: Map<String, Value> =
serde_json::from_str(sample_json).unwrap();
- let result = file_groups_from_commit_metadata(&metadata,
&V6CompletionTimeView::new());
+ let result = file_groups_from_commit_metadata(&metadata,
&create_layout_v1_view());
assert!(result.is_ok());
let file_groups = result.unwrap();
assert_eq!(file_groups.len(), 2);
@@ -475,7 +491,7 @@ mod tests {
}"#;
let metadata: Map<String, Value> =
serde_json::from_str(sample_json).unwrap();
- let result = file_groups_from_commit_metadata(&metadata,
&V6CompletionTimeView::new());
+ let result = file_groups_from_commit_metadata(&metadata,
&create_layout_v1_view());
assert!(result.is_ok());
let file_groups = result.unwrap();
assert_eq!(file_groups.len(), 1);
@@ -507,7 +523,7 @@ mod tests {
}"#;
let metadata: Map<String, Value> =
serde_json::from_str(sample_json).unwrap();
- let result = file_groups_from_commit_metadata(&metadata,
&V6CompletionTimeView::new());
+ let result = file_groups_from_commit_metadata(&metadata,
&create_layout_v1_view());
assert!(result.is_ok());
let file_groups = result.unwrap();
assert_eq!(file_groups.len(), 1);
@@ -531,7 +547,7 @@ mod tests {
}"#;
let metadata: Map<String, Value> =
serde_json::from_str(sample_json).unwrap();
- let result = file_groups_from_commit_metadata(&metadata,
&V6CompletionTimeView::new());
+ let result = file_groups_from_commit_metadata(&metadata,
&create_layout_v1_view());
assert!(result.is_ok());
let file_groups = result.unwrap();
assert_eq!(file_groups.len(), 1);
@@ -555,7 +571,6 @@ mod tests {
let metadata: Map<String, Value> =
serde_json::from_str(sample_json).unwrap();
// Create a completion time view with the mapping for this file's
request timestamp
- use crate::timeline::instant::{Action, Instant, State};
let instants = vec![Instant {
timestamp: "20240418173200000".to_string(),
completion_timestamp: Some("20240418173210000".to_string()),
@@ -563,7 +578,7 @@ mod tests {
state: State::Completed,
epoch_millis: 0,
}];
- let view = CompletionTimeView::from_instants(&instants);
+ let view = create_layout_v2_view(&instants);
let result = file_groups_from_commit_metadata(&metadata, &view);
assert!(result.is_ok());
@@ -581,12 +596,36 @@ mod tests {
mod test_file_groups_from_files_partition_records {
use super::super::*;
+ use crate::config::HudiConfigs;
use crate::metadata::table_record::{
FilesPartitionRecord, HoodieMetadataFileInfo, MetadataRecordType,
};
- use crate::timeline::completion_time::{CompletionTimeView,
V6CompletionTimeView};
use crate::timeline::instant::{Action, Instant, State};
- use std::collections::HashMap;
+ use crate::timeline::view::TimelineView;
+ use std::collections::{HashMap, HashSet};
+ use std::sync::Arc;
+
+ fn create_layout_v1_view() -> TimelineView {
+ let configs =
Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "1")]));
+ TimelineView::new(
+ "99999999999999999".to_string(),
+ None,
+ &[] as &[Instant],
+ HashSet::new(),
+ &configs,
+ )
+ }
+
+ fn create_layout_v2_view(instants: &[Instant]) -> TimelineView {
+ let configs =
Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "2")]));
+ TimelineView::new(
+ "99999999999999999".to_string(),
+ None,
+ instants,
+ HashSet::new(),
+ &configs,
+ )
+ }
fn create_file_info(name: &str, size: i64, is_deleted: bool) ->
HoodieMetadataFileInfo {
HoodieMetadataFileInfo::new(name.to_string(), size, is_deleted)
@@ -616,9 +655,9 @@ mod tests {
files_map.insert(partition.to_string(),
create_file_info(partition, 0, false));
}
(
- "__all_partitions__".to_string(),
+ FilesPartitionRecord::ALL_PARTITIONS_KEY.to_string(),
FilesPartitionRecord {
- key: "__all_partitions__".to_string(),
+ key: FilesPartitionRecord::ALL_PARTITIONS_KEY.to_string(),
record_type: MetadataRecordType::AllPartitions,
files: files_map,
},
@@ -631,7 +670,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
@@ -647,7 +686,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
@@ -670,7 +709,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
@@ -703,7 +742,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
@@ -739,7 +778,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
@@ -765,7 +804,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
@@ -793,7 +832,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
@@ -819,7 +858,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_err());
let err = result.unwrap_err();
@@ -845,7 +884,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_err());
let err = result.unwrap_err();
@@ -855,7 +894,7 @@ mod tests {
}
#[test]
- fn test_v8_uncommitted_base_file_filtered() {
+ fn test_layout_v2_uncommitted_base_file_filtered() {
let mut records = HashMap::new();
// A base file with timestamp that is NOT in the completion view
(uncommitted)
let (key, record) = create_files_record(
@@ -865,7 +904,7 @@ mod tests {
records.insert(key, record);
// Create a non-empty completion view that does NOT include the
file's timestamp
- // This simulates an uncommitted file in v8 tables
+ // This simulates an uncommitted file in timeline layout v2
// The view has OTHER commits, so should_filter_uncommitted()
returns true
let instants = vec![Instant {
timestamp: "20240418173199999".to_string(), // Different
timestamp
@@ -874,7 +913,7 @@ mod tests {
state: State::Completed,
epoch_millis: 0,
}];
- let view = CompletionTimeView::from_instants(&instants);
+ let view = create_layout_v2_view(&instants);
let result = file_groups_from_files_partition_records(&records,
"parquet", &view);
assert!(result.is_ok());
@@ -886,10 +925,10 @@ mod tests {
}
#[test]
- fn test_empty_completion_view_no_filtering() {
- // Empty completion view means no completion time data available
- // In this case, should_filter_uncommitted() returns false
- // and all files are included (v6 behavior)
+ fn test_empty_completion_view_filters_uncommitted() {
+ // For timeline layout v2 with no completion time data (empty
instants),
+ // all files are filtered because their request timestamps don't
have
+ // corresponding completion timestamps in the view.
let mut records = HashMap::new();
let (key, record) = create_files_record(
"partition1",
@@ -897,18 +936,18 @@ mod tests {
);
records.insert(key, record);
- let view = CompletionTimeView::from_instants(&[]);
+ let view = create_layout_v2_view(&[]);
let result = file_groups_from_files_partition_records(&records,
"parquet", &view);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
- // With empty view, no filtering happens
- assert_eq!(file_groups_map.len(), 1);
+ // Layout v2 with empty view filters all files (no completion
timestamps available)
+ assert!(file_groups_map.is_empty());
}
#[test]
- fn test_v8_committed_base_file_included() {
+ fn test_layout_v2_committed_base_file_included() {
let mut records = HashMap::new();
let (key, record) = create_files_record(
"partition1",
@@ -924,7 +963,7 @@ mod tests {
state: State::Completed,
epoch_millis: 0,
}];
- let view = CompletionTimeView::from_instants(&instants);
+ let view = create_layout_v2_view(&instants);
let result = file_groups_from_files_partition_records(&records,
"parquet", &view);
assert!(result.is_ok());
@@ -943,7 +982,7 @@ mod tests {
}
#[test]
- fn test_v8_uncommitted_log_file_filtered() {
+ fn test_layout_v2_uncommitted_log_file_filtered() {
let mut records = HashMap::new();
// Log file has a DIFFERENT base commit timestamp
(20240418173205000)
// that is NOT in the completion view
@@ -967,7 +1006,7 @@ mod tests {
state: State::Completed,
epoch_millis: 0,
}];
- let view = CompletionTimeView::from_instants(&instants);
+ let view = create_layout_v2_view(&instants);
let result = file_groups_from_files_partition_records(&records,
"parquet", &view);
assert!(result.is_ok());
@@ -984,7 +1023,7 @@ mod tests {
}
#[test]
- fn test_v8_committed_log_file_included() {
+ fn test_layout_v2_committed_log_file_included() {
let mut records = HashMap::new();
let (key, record) = create_files_record(
"partition1",
@@ -1006,7 +1045,7 @@ mod tests {
state: State::Completed,
epoch_millis: 0,
}];
- let view = CompletionTimeView::from_instants(&instants);
+ let view = create_layout_v2_view(&instants);
let result = file_groups_from_files_partition_records(&records,
"parquet", &view);
assert!(result.is_ok());
@@ -1038,7 +1077,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
@@ -1063,7 +1102,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
@@ -1091,7 +1130,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"parquet",
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
@@ -1112,7 +1151,7 @@ mod tests {
let result = file_groups_from_files_partition_records(
&records,
"hfile", // Different base file extension
- &V6CompletionTimeView::new(),
+ &create_layout_v1_view(),
);
assert!(result.is_ok());
let file_groups_map = result.unwrap();
diff --git a/crates/core/src/file_group/log_file/content.rs
b/crates/core/src/file_group/log_file/content.rs
index 50ce84c..11e3eb1 100644
--- a/crates/core/src/file_group/log_file/content.rs
+++ b/crates/core/src/file_group/log_file/content.rs
@@ -232,7 +232,7 @@ impl Decoder {
///
/// HFile blocks are used in metadata table log files. Unlike Avro/Parquet
blocks,
/// the content is NOT converted to Arrow RecordBatch because:
- /// - MDT operations need key-based lookup/merge
+ /// - Metadata table operations need key-based lookup/merge
/// - Values are Avro-serialized payloads decoded on demand
///
/// The HFile content structure:
diff --git a/crates/core/src/file_group/log_file/mod.rs
b/crates/core/src/file_group/log_file/mod.rs
index c8ce94e..9b634f7 100644
--- a/crates/core/src/file_group/log_file/mod.rs
+++ b/crates/core/src/file_group/log_file/mod.rs
@@ -18,7 +18,7 @@
*/
use crate::error::CoreError;
use crate::storage::file_metadata::FileMetadata;
-use crate::timeline::completion_time::TimelineViewByCompletionTime;
+use crate::timeline::completion_time::CompletionTimeView;
use crate::Result;
use std::cmp::Ordering;
use std::fmt::Display;
@@ -148,7 +148,7 @@ impl LogFile {
///
/// For v6 tables, the view returns `None` and this is a no-op.
/// For v8+ tables, this sets the completion timestamp for completed
commits.
- pub fn set_completion_time<V: TimelineViewByCompletionTime>(&mut self,
view: &V) {
+ pub fn set_completion_time<V: CompletionTimeView>(&mut self, view: &V) {
self.completion_timestamp = view
.get_completion_time(&self.timestamp)
.map(|s| s.to_string());
diff --git a/crates/core/src/file_group/log_file/scanner.rs
b/crates/core/src/file_group/log_file/scanner.rs
index 1614023..90df710 100644
--- a/crates/core/src/file_group/log_file/scanner.rs
+++ b/crates/core/src/file_group/log_file/scanner.rs
@@ -304,7 +304,9 @@ mod tests {
use crate::config::HudiConfigs;
use crate::file_group::record_batches::RecordBatches;
use crate::hfile::HFileReader;
- use
crate::metadata::table_record::decode_files_partition_record_with_schema;
+ use crate::metadata::table_record::{
+ decode_files_partition_record_with_schema, FilesPartitionRecord,
+ };
use crate::storage::util::parse_uri;
use apache_avro::Schema as AvroSchema;
use hudi_test::QuickstartTripsTable;
@@ -511,7 +513,7 @@ mod tests {
// Validate __all_partitions__ records
let all_partitions = records_by_key
- .get("__all_partitions__")
+ .get(FilesPartitionRecord::ALL_PARTITIONS_KEY)
.expect("Should have __all_partitions__ records");
assert_eq!(
all_partitions.len(),
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 0551eeb..063792e 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -26,7 +26,7 @@ use crate::expr::filter::{Filter, SchemableFilter};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::log_file::scanner::{LogFileScanner, ScanResult};
use crate::file_group::record_batches::RecordBatches;
-use crate::hfile::HFileReader;
+use crate::hfile::{HFileReader, HFileRecord};
use crate::merge::record_merger::RecordMerger;
use crate::metadata::merger::FilesPartitionMerger;
use crate::metadata::meta_field::MetaField;
@@ -340,50 +340,32 @@ impl FileGroupReader {
crate::util::path::is_metadata_table_path(&base_path)
}
- /// Reads a metadata table file slice and returns merged
FilesPartitionRecords.
- ///
- /// This method is specifically designed for reading the metadata table's
`files` partition,
- /// which uses HFile format for base files and HFile blocks in log files.
+ /// Read records from metadata table files partition.
///
/// # Arguments
- /// * `base_file_path` - Relative path to the HFile base file
- /// * `log_file_paths` - Relative paths to log files
+ /// * `file_slice` - The file slice to read from
+ /// * `keys` - Only read records with these keys. If empty, reads all
records.
///
/// # Returns
- /// A HashMap mapping record keys (partition paths or
"__all_partitions__") to
- /// merged `FilesPartitionRecord`s containing file information.
- ///
- /// # Example
- /// ```ignore
- /// let reader =
FileGroupReader::new_with_options(metadata_table_base_uri, options)?;
- /// let merged = reader.read_file_slice_from_metadata_table_paths(
- /// "files/files-0000-0_0-0-0_00000000000000.hfile",
- /// vec!["files/.files-0000-0_20240101120000000.log.1_0-100-200"],
- /// ).await?;
- ///
- /// // Get file listing for a partition
- /// if let Some(record) = merged.get("city=chennai") {
- /// for file_name in record.active_file_names() {
- /// println!("Active file: {}", file_name);
- /// }
- /// }
- /// ```
- pub async fn read_file_slice_from_metadata_table_paths<I, S>(
+ /// HashMap containing the requested keys (or all keys if `keys` is empty).
+ pub(crate) async fn read_metadata_table_files_partition(
&self,
- base_file_path: &str,
- log_file_paths: I,
- ) -> Result<HashMap<String, FilesPartitionRecord>>
- where
- I: IntoIterator<Item = S>,
- S: AsRef<str>,
- {
- let log_file_paths: Vec<String> = log_file_paths
- .into_iter()
- .map(|s| s.as_ref().to_string())
- .collect();
+ file_slice: &FileSlice,
+ keys: &[&str],
+ ) -> Result<HashMap<String, FilesPartitionRecord>> {
+ let base_file_path = file_slice.base_file_relative_path()?;
+ let log_file_paths: Vec<String> = if file_slice.has_log_file() {
+ file_slice
+ .log_files
+ .iter()
+ .map(|log_file| file_slice.log_file_relative_path(log_file))
+ .collect::<Result<Vec<String>>>()?
+ } else {
+ vec![]
+ };
- // Read HFile base file using async open
- let mut hfile_reader = HFileReader::open(&self.storage, base_file_path)
+ // Open HFile
+ let mut hfile_reader = HFileReader::open(&self.storage,
&base_file_path)
.await
.map_err(|e| {
ReadFileSliceError(format!(
@@ -399,10 +381,20 @@ impl FileGroupReader {
.ok_or_else(|| ReadFileSliceError("No Avro schema found in
HFile".to_string()))?
.clone();
- // Collect base records
- let base_records = hfile_reader
- .collect_records()
- .map_err(|e| ReadFileSliceError(format!("Failed to collect HFile
records: {:?}", e)))?;
+ // Read base records: all if keys is empty, targeted lookup otherwise
+ let base_records: Vec<HFileRecord> = if keys.is_empty() {
+ hfile_reader.collect_records().map_err(|e| {
+ ReadFileSliceError(format!("Failed to collect HFile records:
{:?}", e))
+ })?
+ } else {
+ let lookup_results = hfile_reader.lookup_records(keys).map_err(|e|
{
+ ReadFileSliceError(format!("Failed to lookup HFile records:
{:?}", e))
+ })?;
+ lookup_results
+ .into_iter()
+ .filter_map(|(_, opt_record)| opt_record)
+ .collect()
+ };
// Scan log files if present
let log_records = if log_file_paths.is_empty() {
@@ -424,59 +416,9 @@ impl FileGroupReader {
}
};
- // Merge base and log records
+ // Merge records (key filtering is applied internally when keys is
non-empty)
let merger = FilesPartitionMerger::new(schema);
- merger.merge(&base_records, &log_records)
- }
-
- /// Same as [FileGroupReader::read_file_slice_from_metadata_table_paths],
but blocking.
- pub fn read_file_slice_from_metadata_table_paths_blocking<I, S>(
- &self,
- base_file_path: &str,
- log_file_paths: I,
- ) -> Result<HashMap<String, FilesPartitionRecord>>
- where
- I: IntoIterator<Item = S>,
- S: AsRef<str>,
- {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(
- self.read_file_slice_from_metadata_table_paths(base_file_path,
log_file_paths),
- )
- }
-
- /// Reads a metadata table file slice using a FileSlice object.
- ///
- /// Convenience wrapper around
[FileGroupReader::read_file_slice_from_metadata_table_paths].
- pub async fn read_file_slice_from_metadata_table(
- &self,
- file_slice: &FileSlice,
- ) -> Result<HashMap<String, FilesPartitionRecord>> {
- let base_file_path = file_slice.base_file_relative_path()?;
- let log_file_paths = if file_slice.has_log_file() {
- file_slice
- .log_files
- .iter()
- .map(|log_file| file_slice.log_file_relative_path(log_file))
- .collect::<Result<Vec<String>>>()?
- } else {
- vec![]
- };
- self.read_file_slice_from_metadata_table_paths(&base_file_path,
log_file_paths)
- .await
- }
-
- /// Same as [FileGroupReader::read_file_slice_from_metadata_table], but
blocking.
- pub fn read_file_slice_from_metadata_table_blocking(
- &self,
- file_slice: &FileSlice,
- ) -> Result<HashMap<String, FilesPartitionRecord>> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(self.read_file_slice_from_metadata_table(file_slice))
+ merger.merge_for_keys(&base_records, &log_records, keys)
}
}
@@ -841,151 +783,74 @@ mod tests {
"files/.files-0000-0_20251220210130911.log.1_3-1149-3338",
];
- #[test]
- fn test_read_file_slice_from_metadata_table_paths_without_log_files() ->
Result<()> {
- use crate::metadata::table_record::MetadataRecordType;
-
- let reader = create_metadata_table_reader()?;
-
- // Read base file only (no log files)
- let log_files: Vec<&str> = vec![];
- let merged = reader.read_file_slice_from_metadata_table_paths_blocking(
- METADATA_TABLE_FILES_BASE_FILE,
- log_files,
- )?;
-
- // Initial base file only has __all_partitions__ record
- // City partition records are added through log files
- assert_eq!(merged.len(), 1, "Base file should have 1 key");
- assert!(merged.contains_key("__all_partitions__"));
+ fn create_test_file_slice() -> Result<FileSlice> {
+ use crate::file_group::FileGroup;
- // Validate __all_partitions__ record
- let all_parts = merged.get("__all_partitions__").unwrap();
- assert_eq!(all_parts.record_type, MetadataRecordType::AllPartitions);
+ let mut fg = FileGroup::new("files-0000-0".to_string(),
"files".to_string());
+ let base_file_name = METADATA_TABLE_FILES_BASE_FILE
+ .strip_prefix("files/")
+ .unwrap();
+ fg.add_base_file_from_name(base_file_name)?;
+ let log_file_names: Vec<_> = METADATA_TABLE_FILES_LOG_FILES
+ .iter()
+ .map(|s| s.strip_prefix("files/").unwrap())
+ .collect();
+ fg.add_log_files_from_names(log_file_names)?;
- Ok(())
+ Ok(fg
+ .get_file_slice_as_of("99999999999999999")
+ .expect("Should have file slice")
+ .clone())
}
- #[test]
- fn test_read_file_slice_from_metadata_table_paths_with_log_files() ->
Result<()> {
- use crate::metadata::table_record::MetadataRecordType;
+ #[tokio::test]
+ async fn test_read_metadata_table_files_partition() -> Result<()> {
+ use crate::metadata::table_record::{FilesPartitionRecord,
MetadataRecordType};
let reader = create_metadata_table_reader()?;
+ let file_slice = create_test_file_slice()?;
- // Read base file + all log files
- let merged = reader.read_file_slice_from_metadata_table_paths_blocking(
- METADATA_TABLE_FILES_BASE_FILE,
- METADATA_TABLE_FILES_LOG_FILES.to_vec(),
- )?;
+ // Test 1: Read all records (empty keys)
+ let all_records = reader
+ .read_metadata_table_files_partition(&file_slice, &[])
+ .await?;
- // Should still have 4 keys after merging
- assert_eq!(merged.len(), 4, "Should have 4 partition keys after
merge");
+ // Should have 4 keys after merging
+ assert_eq!(
+ all_records.len(),
+ 4,
+ "Should have 4 partition keys after merge"
+ );
// Validate all partition keys have correct record types
- for (key, record) in &merged {
- if key == "__all_partitions__" {
+ for (key, record) in &all_records {
+ if key == FilesPartitionRecord::ALL_PARTITIONS_KEY {
assert_eq!(record.record_type,
MetadataRecordType::AllPartitions);
} else {
assert_eq!(record.record_type, MetadataRecordType::Files);
}
}
- // Expected UUIDs for each partition's files
- const CHENNAI_UUID: &str = "6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc";
- const SAN_FRANCISCO_UUID: &str =
"036ded81-9ed4-479f-bcea-7145dfa0079b";
- const SAO_PAULO_UUID: &str = "8aa68f7e-afd6-4c94-b86c-8a886552e08d";
-
- // Validate chennai partition
- let chennai = merged.get("city=chennai").unwrap();
- let active_files = chennai.active_file_names();
+ // Validate chennai partition has files
+ let chennai = all_records.get("city=chennai").unwrap();
assert!(
- active_files.len() >= 2,
- "Chennai should have at least 2 active files, got {}",
- active_files.len()
+ chennai.active_file_names().len() >= 2,
+ "Chennai should have at least 2 active files"
);
assert!(chennai.total_size() > 0, "Total size should be > 0");
- for file_name in &active_files {
- assert!(
- file_name.contains(CHENNAI_UUID),
- "Chennai file should contain UUID: {}",
- file_name
- );
- }
-
- // Validate san_francisco partition
- let sf = merged.get("city=san_francisco").unwrap();
- for file_name in sf.active_file_names() {
- assert!(
- file_name.contains(SAN_FRANCISCO_UUID),
- "San Francisco file should contain UUID: {}",
- file_name
- );
- }
-
- // Validate sao_paulo partition
- let sp = merged.get("city=sao_paulo").unwrap();
- for file_name in sp.active_file_names() {
- assert!(
- file_name.contains(SAO_PAULO_UUID),
- "Sao Paulo file should contain UUID: {}",
- file_name
- );
- }
-
- Ok(())
- }
-
- #[test]
- fn test_read_file_slice_from_metadata_table_error_handling() -> Result<()>
{
- let reader = create_metadata_table_reader()?;
-
- // Test with non-existent base file
- let result = reader.read_file_slice_from_metadata_table_paths_blocking(
- "files/nonexistent.hfile",
- Vec::<&str>::new(),
- );
-
- assert!(result.is_err(), "Should error on non-existent file");
- let err = result.unwrap_err().to_string();
- assert!(
- err.contains("Failed to read metadata table base file"),
- "Error should mention metadata table base file: {}",
- err
- );
-
- Ok(())
- }
-
- #[test]
- fn test_read_file_slice_from_metadata_table_blocking() -> Result<()> {
- use crate::file_group::FileGroup;
- let reader = create_metadata_table_reader()?;
-
- // Build FileGroup using the API
- let mut fg = FileGroup::new("files-0000-0".to_string(),
"files".to_string());
- let base_file_name = METADATA_TABLE_FILES_BASE_FILE
- .strip_prefix("files/")
- .unwrap();
- fg.add_base_file_from_name(base_file_name)?;
- let log_file_names: Vec<_> = METADATA_TABLE_FILES_LOG_FILES
- .iter()
- .map(|s| s.strip_prefix("files/").unwrap())
- .collect();
- fg.add_log_files_from_names(log_file_names)?;
-
- let file_slice = fg
- .get_file_slice_as_of("99999999999999999")
- .expect("Should have file slice");
-
- let merged =
reader.read_file_slice_from_metadata_table_blocking(file_slice)?;
+ // Test 2: Read specific keys
+ let keys = vec![FilesPartitionRecord::ALL_PARTITIONS_KEY,
"city=chennai"];
+ let filtered_records = reader
+ .read_metadata_table_files_partition(&file_slice, &keys)
+ .await?;
- // Should have 4 keys: __all_partitions__ + 3 city partitions
- assert_eq!(merged.len(), 4);
- assert!(merged.contains_key("__all_partitions__"));
- assert!(merged.contains_key("city=chennai"));
- assert!(merged.contains_key("city=san_francisco"));
- assert!(merged.contains_key("city=sao_paulo"));
+ // Should only contain the requested keys
+ assert_eq!(filtered_records.len(), 2);
+
assert!(filtered_records.contains_key(FilesPartitionRecord::ALL_PARTITIONS_KEY));
+ assert!(filtered_records.contains_key("city=chennai"));
+ assert!(!filtered_records.contains_key("city=san_francisco"));
+ assert!(!filtered_records.contains_key("city=sao_paulo"));
Ok(())
}
diff --git a/crates/core/src/hfile/reader.rs b/crates/core/src/hfile/reader.rs
index 3c7d11e..82b56ec 100644
--- a/crates/core/src/hfile/reader.rs
+++ b/crates/core/src/hfile/reader.rs
@@ -879,20 +879,20 @@ impl HFileReader {
Ok(HFileIterator { reader: self })
}
- // ================== HFileRecord API for MDT ==================
+ // ================== HFileRecord API for metadata table ==================
/// Convert a KeyValue to an owned HFileRecord.
///
/// This extracts the key content (without length prefix) and value bytes
- /// into an owned struct suitable for MDT operations.
+ /// into an owned struct suitable for metadata table operations.
fn key_value_to_record(kv: &KeyValue) -> HFileRecord {
HFileRecord::new(kv.key().content().to_vec(), kv.value().to_vec())
}
/// Collect all records from the HFile as owned HFileRecords.
///
- /// This is useful for MDT operations where records need to be stored
- /// and merged with log file records.
+ /// This is useful for metadata table operations where records need to be
+ /// stored and merged with log file records.
///
/// # Example
/// ```ignore
@@ -1998,7 +1998,7 @@ mod tests {
// 3. "city=san_francisco" - 2 parquet files (UUID:
036ded81-9ed4-479f-bcea-7145dfa0079b)
// 4. "city=sao_paulo" - 2 parquet files (UUID:
8aa68f7e-afd6-4c94-b86c-8a886552e08d)
- use crate::metadata::table_record::decode_files_partition_record;
+ use crate::metadata::table_record::{decode_files_partition_record,
FilesPartitionRecord};
use hudi_test::QuickstartTripsTable;
/// Get the path to the files partition directory in the test table.
@@ -2062,7 +2062,7 @@ mod tests {
assert_eq!(
keys,
vec![
- "__all_partitions__",
+ FilesPartitionRecord::ALL_PARTITIONS_KEY,
"city=chennai",
"city=san_francisco",
"city=sao_paulo"
@@ -2100,7 +2100,7 @@ mod tests {
.unwrap_or_else(|e| panic!("Failed to decode record for key
{}: {}", key, e));
match key {
- "__all_partitions__" => {
+ FilesPartitionRecord::ALL_PARTITIONS_KEY => {
// Validate ALL_PARTITIONS record type and partitions
assert_eq!(
files_record.record_type,
diff --git a/crates/core/src/hfile/record.rs b/crates/core/src/hfile/record.rs
index 4ee6364..b041ed4 100644
--- a/crates/core/src/hfile/record.rs
+++ b/crates/core/src/hfile/record.rs
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-//! HFile record types for MDT (Metadata Table) operations.
+//! HFile record types for metadata table operations.
//!
//! This module provides simple, owned record types for HFile key-value pairs.
-//! These are designed for use in MDT operations where:
+//! These are designed for use in metadata table operations where:
//! - Records need to be passed around and stored
//! - Key-based lookups and merging are primary operations
//! - Values are Avro-serialized payloads decoded on demand
@@ -31,8 +31,8 @@ use std::cmp::Ordering;
/// An owned HFile record with key and value.
///
-/// This is a simple struct designed for MDT operations. The key is the
-/// UTF-8 record key (content only, without HFile key structure), and
+/// This is a simple struct designed for metadata table operations. The key is
+/// the UTF-8 record key (content only, without HFile key structure), and
/// the value is the raw bytes (typically Avro-serialized payload).
///
/// # Example
@@ -83,7 +83,7 @@ impl HFileRecord {
/// Returns whether this record represents a deletion.
///
- /// In MDT, a deleted record has an empty value.
+ /// In metadata table, a deleted record has an empty value.
pub fn is_deleted(&self) -> bool {
self.value.is_empty()
}
diff --git a/crates/core/src/metadata/merger.rs
b/crates/core/src/metadata/merger.rs
index b0afac0..f121099 100644
--- a/crates/core/src/metadata/merger.rs
+++ b/crates/core/src/metadata/merger.rs
@@ -105,6 +105,63 @@ impl FilesPartitionMerger {
Ok(merged)
}
+ /// Merge records, optionally filtering to specific keys.
+ ///
+ /// When `keys` is empty, processes all records (same as `merge()`).
+ /// When `keys` is non-empty, only processes records matching those keys,
+ /// which is useful for efficient partition pruning.
+ ///
+ /// # Arguments
+ /// * `base_records` - Records from the base HFile
+ /// * `log_records` - Records from log files
+ /// * `keys` - Only process records with these keys. If empty, process all.
+ ///
+ /// # Returns
+ /// A HashMap containing the requested keys (or all if keys is empty).
+ pub fn merge_for_keys(
+ &self,
+ base_records: &[HFileRecord],
+ log_records: &[HFileRecord],
+ keys: &[&str],
+ ) -> Result<HashMap<String, FilesPartitionRecord>> {
+ // When keys is empty, process all records
+ if keys.is_empty() {
+ return self.merge(base_records, log_records);
+ }
+
+ let key_set: std::collections::HashSet<&str> =
keys.iter().copied().collect();
+ let mut merged: HashMap<String, FilesPartitionRecord> = HashMap::new();
+
+ // Process base records, filtering by key
+ for record in base_records {
+ if let Some(key_str) = record.key_as_str() {
+ if key_set.contains(key_str) {
+ let decoded = self.decode_record(record)?;
+ merged.insert(decoded.key.clone(), decoded);
+ }
+ }
+ }
+
+ // Process log records, filtering by key
+ for record in log_records {
+ if let Some(key_str) = record.key_as_str() {
+ if key_set.contains(key_str) {
+ let decoded = self.decode_record(record)?;
+ match merged.get_mut(&decoded.key) {
+ Some(existing) => {
+ self.merge_files_partition_records(existing,
&decoded);
+ }
+ None => {
+ merged.insert(decoded.key.clone(), decoded);
+ }
+ }
+ }
+ }
+ }
+
+ Ok(merged)
+ }
+
/// Decode an HFile record using the schema.
fn decode_record(&self, record: &HFileRecord) ->
Result<FilesPartitionRecord> {
decode_files_partition_record_with_schema(record, &self.schema)
@@ -159,7 +216,7 @@ impl FilesPartitionMerger {
mod tests {
use super::*;
use crate::hfile::HFileReader;
- use crate::metadata::table_record::MetadataRecordType;
+ use crate::metadata::table_record::{FilesPartitionRecord,
MetadataRecordType};
use hudi_test::QuickstartTripsTable;
use std::path::PathBuf;
@@ -422,7 +479,7 @@ mod tests {
// Validate __all_partitions__ - should list all 3 city partitions
let all_partitions = merged
- .get("__all_partitions__")
+ .get(FilesPartitionRecord::ALL_PARTITIONS_KEY)
.expect("Should have __all_partitions__");
assert_eq!(
all_partitions.record_type,
@@ -533,4 +590,45 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_merge_for_keys_filters_to_requested_keys() -> crate::Result<()> {
+ // Get base records from the HFile
+ let dir = files_partition_dir();
+ let mut hfiles: Vec<_> = std::fs::read_dir(&dir)
+ .unwrap()
+ .filter_map(|e| e.ok())
+ .filter(|e| {
+ e.path()
+ .extension()
+ .map(|ext| ext == "hfile")
+ .unwrap_or(false)
+ })
+ .collect();
+ hfiles.sort_by_key(|e| e.file_name());
+
+ let hfile_path = hfiles.last().expect("No HFile found").path();
+ let bytes = std::fs::read(&hfile_path).expect("Failed to read HFile");
+ let mut reader = HFileReader::new(bytes).expect("Failed to create
HFileReader");
+ let schema = reader
+ .get_avro_schema()
+ .expect("Failed to get schema")
+ .expect("No schema in HFile")
+ .clone();
+ let base_records = reader.collect_records().expect("Failed to collect
records");
+
+ let merger = FilesPartitionMerger::new(schema);
+
+ // Request only specific keys
+ let keys = vec![FilesPartitionRecord::ALL_PARTITIONS_KEY,
"city=chennai"];
+ let merged = merger.merge_for_keys(&base_records, &[], &keys)?;
+
+ // Should only contain requested keys
+ assert_eq!(merged.len(), 2);
+ assert!(merged.contains_key(FilesPartitionRecord::ALL_PARTITIONS_KEY));
+ assert!(merged.contains_key("city=chennai"));
+ assert!(!merged.contains_key("city=san_francisco"));
+
+ Ok(())
+ }
}
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs
index e80a49f..52a64a6 100644
--- a/crates/core/src/metadata/mod.rs
+++ b/crates/core/src/metadata/mod.rs
@@ -20,7 +20,8 @@ pub mod commit;
pub mod merger;
pub mod meta_field;
pub mod replace_commit;
-pub mod table_record;
+pub mod table;
+pub use table::records as table_record;
pub const HUDI_METADATA_DIR: &str = ".hoodie";
pub const DELTALAKE_METADATA_DIR: &str = "_delta_log";
diff --git a/crates/core/src/metadata/table/mod.rs
b/crates/core/src/metadata/table/mod.rs
new file mode 100644
index 0000000..9d81cd0
--- /dev/null
+++ b/crates/core/src/metadata/table/mod.rs
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Metadata table APIs for the Hudi Table.
+//!
+//! This module provides methods for interacting with Hudi's metadata table,
+//! which stores file listings and other metadata for efficient table
operations.
+
+pub mod records;
+
+use std::collections::HashMap;
+
+use crate::config::read::HudiReadConfig;
+use crate::config::table::HudiTableConfig::{
+ MetadataTableEnabled, MetadataTablePartitions, PartitionFields,
TableVersion,
+};
+use crate::error::CoreError;
+use crate::expr::filter::from_str_tuples;
+use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
+use crate::storage::util::join_url_segments;
+use crate::table::partition::PartitionPruner;
+use crate::table::Table;
+use crate::Result;
+
+use records::FilesPartitionRecord;
+
+impl Table {
+ /// Check if this table is a metadata table.
+ ///
+ /// Detection is based on the base path ending with `.hoodie/metadata`.
+ pub fn is_metadata_table(&self) -> bool {
+ let base_path: String = self
+ .hudi_configs
+ .get_or_default(crate::config::table::HudiTableConfig::BasePath)
+ .into();
+ crate::util::path::is_metadata_table_path(&base_path)
+ }
+
+ /// Get the list of available metadata table partitions for this table.
+ ///
+ /// Returns the partitions configured in [`MetadataTablePartitions`].
+ pub fn get_metadata_table_partitions(&self) -> Vec<String> {
+ self.hudi_configs
+ .get_or_default(MetadataTablePartitions)
+ .into()
+ }
+
+ /// Check if the metadata table is enabled.
+ ///
+ /// Returns `true` if:
+ /// 1. Table version is >= 8 (metadata table support is only for v8+
tables), AND
+ /// 2. Table is not a metadata table itself, AND
+ /// 3. Either:
+ /// - `hoodie.metadata.enable` is explicitly true, OR
+ /// - `files` is in the configured [`MetadataTablePartitions`]
+ ///
+ /// # Note
+ /// The metadata table is considered active when partitions are configured,
+ /// even without explicit `hoodie.metadata.enable=true`. When metadata
table
+ /// is enabled, it must have at least the `files` partition enabled.
+ pub fn is_metadata_table_enabled(&self) -> bool {
+ // TODO: drop v6 support then no need to check table version here
+ let table_version: isize = self
+ .hudi_configs
+ .get(TableVersion)
+ .map(|v| v.into())
+ .unwrap_or(0);
+
+ if table_version < 8 {
+ return false;
+ }
+
+ if self.is_metadata_table() {
+ return false;
+ }
+
+ // Check if "files" partition is configured
+ let has_files_partition = self
+ .get_metadata_table_partitions()
+ .contains(&FilesPartitionRecord::PARTITION_NAME.to_string());
+
+ // Explicit check for hoodie.metadata.enable
+ let metadata_explicitly_enabled: bool = self
+ .hudi_configs
+ .get_or_default(MetadataTableEnabled)
+ .into();
+
+ metadata_explicitly_enabled || has_files_partition
+ }
+
+ /// Create a metadata table instance for this data table.
+ ///
+ /// TODO: support more partitions. Only "files" is used currently.
+ ///
+ /// # Errors
+ ///
+ /// Returns an error if the metadata table cannot be created or if there
are
+ /// no metadata table partitions configured.
+ ///
+ /// # Note
+ /// Must be called on a DATA table, not a METADATA table.
+ pub async fn new_metadata_table(&self) -> Result<Table> {
+ if self.is_metadata_table() {
+ return Err(CoreError::MetadataTable(
+ "Cannot create metadata table from another metadata
table".to_string(),
+ ));
+ }
+
+ let mdt_partitions = self.get_metadata_table_partitions();
+ if mdt_partitions.is_empty() {
+ return Err(CoreError::MetadataTable(
+ "No metadata table partitions configured".to_string(),
+ ));
+ }
+
+ let mdt_url = join_url_segments(&self.base_url(), &[".hoodie",
"metadata"])?;
+ Table::new_with_options(
+ mdt_url.as_str(),
+ [(PartitionFields.as_ref(), METADATA_TABLE_PARTITION_FIELD)],
+ )
+ .await
+ }
+
+ /// Same as [Table::new_metadata_table], but blocking.
+ pub fn new_metadata_table_blocking(&self) -> Result<Table> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+ .block_on(async { self.new_metadata_table().await })
+ }
+
+ /// Fetch records from the `files` partition of metadata table
+ /// with optional data table partition pruning.
+ ///
+ /// # Note
+ /// Must be called on a DATA table, not a METADATA table.
+ pub async fn read_metadata_table_files_partition(
+ &self,
+ partition_pruner: &PartitionPruner,
+ ) -> Result<HashMap<String, FilesPartitionRecord>> {
+ let metadata_table = self.new_metadata_table().await?;
+ metadata_table
+ .fetch_files_partition_records(partition_pruner)
+ .await
+ }
+
+ /// Same as [Table::read_metadata_table_files_partition], but blocking.
+ pub fn read_metadata_table_files_partition_blocking(
+ &self,
+ partition_pruner: &PartitionPruner,
+ ) -> Result<HashMap<String, FilesPartitionRecord>> {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?
+
.block_on(self.read_metadata_table_files_partition(partition_pruner))
+ }
+
+ /// Fetch records from the `files` partition with optional partition
pruning.
+ ///
+ /// # Arguments
+ /// * `partition_pruner` - Data table's partition pruner to filter
partitions.
+ ///
+ /// # Note
+ /// Must be called on a METADATA table instance.
+ pub async fn fetch_files_partition_records(
+ &self,
+ partition_pruner: &PartitionPruner,
+ ) -> Result<HashMap<String, FilesPartitionRecord>> {
+ // If no partition filters, read all records (pass empty keys)
+ if partition_pruner.is_empty() {
+ return self.read_files_partition(&[]).await;
+ }
+
+ // Step 1: Get all partition paths
+ let all_partitions_records = self
+ .read_files_partition(&[FilesPartitionRecord::ALL_PARTITIONS_KEY])
+ .await?;
+
+ let partition_names: Vec<&str> = all_partitions_records
+ .get(FilesPartitionRecord::ALL_PARTITIONS_KEY)
+ .map(|r| r.partition_names())
+ .unwrap_or_default();
+
+ // Step 2: Apply partition pruning
+ let pruned: Vec<&str> = partition_names
+ .into_iter()
+ .filter(|p| partition_pruner.should_include(p))
+ .collect();
+
+ if pruned.is_empty() {
+ return Ok(HashMap::new());
+ }
+
+ // Step 3: Read only the pruned partition records
+ self.read_files_partition(&pruned).await
+ }
+
+ /// Read records from the `files` partition.
+ ///
+ /// If keys is empty, reads all records. Otherwise, reads only the
specified keys.
+ ///
+ /// # Note
+ /// Must be called on a METADATA table instance.
+ async fn read_files_partition(
+ &self,
+ keys: &[&str],
+ ) -> Result<HashMap<String, FilesPartitionRecord>> {
+ let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() else {
+ return Ok(HashMap::new());
+ };
+
+ let timeline_view = self.timeline.create_view_as_of(timestamp).await?;
+
+ let filters = from_str_tuples([(
+ METADATA_TABLE_PARTITION_FIELD,
+ "=",
+ FilesPartitionRecord::PARTITION_NAME,
+ )])?;
+ let partition_schema = self.get_partition_schema().await?;
+ let partition_pruner =
+ PartitionPruner::new(&filters, &partition_schema,
self.hudi_configs.as_ref())?;
+
+ let file_slices = self
+ .file_system_view
+ .get_file_slices_by_storage_listing(&partition_pruner,
&timeline_view)
+ .await?;
+
+ if file_slices.len() != 1 {
+ return Err(CoreError::MetadataTable(format!(
+ "Expected 1 file slice for {} partition, got {}",
+ FilesPartitionRecord::PARTITION_NAME,
+ file_slices.len()
+ )));
+ }
+
+ let file_slice = file_slices.into_iter().next().unwrap();
+ let fg_reader = self.create_file_group_reader_with_options([(
+ HudiReadConfig::FileGroupEndTimestamp,
+ timestamp,
+ )])?;
+
+ fg_reader
+ .read_metadata_table_files_partition(&file_slice, keys)
+ .await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::config::table::HudiTableConfig::TableVersion;
+ use crate::table::partition::PartitionPruner;
+ use hudi_test::{QuickstartTripsTable, SampleTable};
+ use records::{FilesPartitionRecord, MetadataRecordType};
+ use std::collections::HashSet;
+
+ fn get_data_table() -> Table {
+ let table_path =
QuickstartTripsTable::V8Trips8I3U1D.path_to_mor_avro();
+ Table::new_blocking(&table_path).unwrap()
+ }
+
+ #[test]
+ fn hudi_table_read_metadata_table_files_partition() {
+ let data_table = get_data_table();
+ let partition_schema =
data_table.get_partition_schema_blocking().unwrap();
+ let partition_pruner =
+ PartitionPruner::new(&[], &partition_schema,
data_table.hudi_configs.as_ref()).unwrap();
+
+ let records = data_table
+ .read_metadata_table_files_partition_blocking(&partition_pruner)
+ .unwrap();
+
+ // Should have 4 records: __all_partitions__ + 3 city partitions
+ assert_eq!(records.len(), 4);
+
+ // Validate __all_partitions__ record
+ let all_partitions = records
+ .get(FilesPartitionRecord::ALL_PARTITIONS_KEY)
+ .unwrap();
+ assert_eq!(
+ all_partitions.record_type,
+ MetadataRecordType::AllPartitions
+ );
+ let partition_names: HashSet<&str> =
all_partitions.partition_names().into_iter().collect();
+ assert_eq!(
+ partition_names,
+ HashSet::from(["city=chennai", "city=san_francisco",
"city=sao_paulo"])
+ );
+
+ // Validate city=chennai record with actual file names
+ let chennai = records.get("city=chennai").unwrap();
+ assert_eq!(chennai.record_type, MetadataRecordType::Files);
+ let chennai_files: HashSet<_> =
chennai.active_file_names().into_iter().collect();
+ assert_eq!(
+ chennai_files,
+ HashSet::from([
+
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_2-986-2794_20251220210108078.parquet",
+
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_0-1112-3190_20251220210129235.parquet",
+
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210127080.log.1_0-1072-3078",
+
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210128625.log.1_0-1097-3150",
+ ])
+ );
+ assert!(chennai.total_size() > 0);
+ }
+
+ #[test]
+ fn hudi_table_get_metadata_table_partitions() {
+ let data_table = get_data_table();
+
+ // Verify we can get the metadata table partitions from the data table
+ let partitions = data_table.get_metadata_table_partitions();
+
+ // The test table has 5 metadata table partitions configured
+ assert_eq!(
+ partitions.len(),
+ 5,
+ "Should have 5 metadata table partitions, got: {:?}",
+ partitions
+ );
+
+ // Verify all expected partitions are present
+ let expected = [
+ "column_stats",
+ "files",
+ "partition_stats",
+ "record_index",
+ "secondary_index_rider_idx",
+ ];
+ for partition in &expected {
+ assert!(
+ partitions.contains(&partition.to_string()),
+ "Should contain '{}' partition, got: {:?}",
+ partition,
+ partitions
+ );
+ }
+ }
+
+ #[test]
+ fn hudi_table_is_metadata_table_enabled() {
+ // V8 table with files partition configured should enable metadata
table
+ // even without explicit hoodie.metadata.enable=true
+ let data_table = get_data_table();
+
+ // Verify it's a v8 table
+ let table_version: isize = data_table
+ .hudi_configs
+ .get(TableVersion)
+ .map(|v| v.into())
+ .unwrap_or(0);
+ assert_eq!(table_version, 8, "Test table should be v8");
+
+ // Verify files partition is configured
+ let partitions = data_table.get_metadata_table_partitions();
+ assert!(
+ partitions.contains(&"files".to_string()),
+ "Should have 'files' partition configured"
+ );
+
+ // Verify is_metadata_table_enabled returns true (implicit enablement)
+ assert!(
+ data_table.is_metadata_table_enabled(),
+ "is_metadata_table_enabled should return true for v8 table with
files partition"
+ );
+ }
+
+ #[test]
+ fn hudi_table_v6_metadata_table_not_enabled() {
+ // V6 tables should NOT have metadata table enabled, even with
explicit setting
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+
+ // Verify it's a v6 table
+ let table_version: isize = hudi_table
+ .hudi_configs
+ .get(TableVersion)
+ .map(|v| v.into())
+ .unwrap_or(0);
+ assert_eq!(table_version, 6, "Test table should be v6");
+
+ // V6 tables should not have metadata table enabled
+ assert!(
+ !hudi_table.is_metadata_table_enabled(),
+ "is_metadata_table_enabled should return false for v6 table"
+ );
+ }
+
+ #[test]
+ fn hudi_table_is_not_metadata_table() {
+ // A regular data table should not be a metadata table
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ assert!(
+ !hudi_table.is_metadata_table(),
+ "Regular data table should not be a metadata table"
+ );
+ }
+
+ #[test]
+ fn hudi_table_metadata_table_is_metadata_table() {
+ // Create a metadata table and verify it's recognized as such
+ let data_table = get_data_table();
+ let metadata_table = data_table.new_metadata_table_blocking().unwrap();
+ assert!(
+ metadata_table.is_metadata_table(),
+ "Metadata table should be recognized as a metadata table"
+ );
+ }
+
+ #[test]
+ fn hudi_table_new_metadata_table_from_metadata_table_errors() {
+ // Trying to create a metadata table from a metadata table should fail
+ let data_table = get_data_table();
+ let metadata_table = data_table.new_metadata_table_blocking().unwrap();
+
+ let result = metadata_table.new_metadata_table_blocking();
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("Cannot create metadata table from another metadata
table"),
+ "Error message should indicate cannot create from metadata table"
+ );
+ }
+}
diff --git a/crates/core/src/metadata/table_record.rs
b/crates/core/src/metadata/table/records.rs
similarity index 92%
rename from crates/core/src/metadata/table_record.rs
rename to crates/core/src/metadata/table/records.rs
index c972904..fd78c41 100644
--- a/crates/core/src/metadata/table_record.rs
+++ b/crates/core/src/metadata/table/records.rs
@@ -39,6 +39,40 @@ use apache_avro::types::Value as AvroValue;
use apache_avro::Schema as AvroSchema;
use std::collections::HashMap;
+/// Metadata table partition types.
+///
+/// These represent the different partitions (directories) within the metadata
table,
+/// each storing a different type of index data.
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum MetadataPartitionType {
+ /// The "files" partition containing file listings per data table
partition.
+ Files,
+ /// The "column_stats" partition containing column statistics.
+ ColumnStats,
+ /// The "partition_stats" partition containing partition-level statistics.
+ PartitionStats,
+ /// The "record_index" partition containing record-level index entries.
+ RecordIndex,
+}
+
+impl MetadataPartitionType {
+ /// Get the partition directory name as used in the metadata table.
+ pub fn partition_name(&self) -> &'static str {
+ match self {
+ Self::Files => "files",
+ Self::ColumnStats => "column_stats",
+ Self::PartitionStats => "partition_stats",
+ Self::RecordIndex => "record_index",
+ }
+ }
+}
+
+impl std::fmt::Display for MetadataPartitionType {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.partition_name())
+ }
+}
+
/// File information from the metadata table.
#[derive(Debug, Clone, PartialEq)]
pub struct HoodieMetadataFileInfo {
@@ -123,6 +157,9 @@ impl FilesPartitionRecord {
/// The partition name in the metadata table that stores file listings.
pub const PARTITION_NAME: &'static str = "files";
+ /// The key for the record that contains all partition paths.
+ pub const ALL_PARTITIONS_KEY: &'static str = "__all_partitions__";
+
/// Check if this is an ALL_PARTITIONS record.
pub fn is_all_partitions(&self) -> bool {
self.record_type == MetadataRecordType::AllPartitions
@@ -431,6 +468,40 @@ mod tests {
.unwrap_or_else(|| panic!("No HFile found in {:?}", dir))
}
+ #[test]
+ fn test_metadata_partition_type_partition_name() {
+ assert_eq!(MetadataPartitionType::Files.partition_name(), "files");
+ assert_eq!(
+ MetadataPartitionType::ColumnStats.partition_name(),
+ "column_stats"
+ );
+ assert_eq!(
+ MetadataPartitionType::PartitionStats.partition_name(),
+ "partition_stats"
+ );
+ assert_eq!(
+ MetadataPartitionType::RecordIndex.partition_name(),
+ "record_index"
+ );
+ }
+
+ #[test]
+ fn test_metadata_partition_type_display() {
+ assert_eq!(format!("{}", MetadataPartitionType::Files), "files");
+ assert_eq!(
+ format!("{}", MetadataPartitionType::ColumnStats),
+ "column_stats"
+ );
+ assert_eq!(
+ format!("{}", MetadataPartitionType::PartitionStats),
+ "partition_stats"
+ );
+ assert_eq!(
+ format!("{}", MetadataPartitionType::RecordIndex),
+ "record_index"
+ );
+ }
+
#[test]
fn test_metadata_record_type_from_i32() {
assert_eq!(
@@ -502,7 +573,7 @@ mod tests {
// Test ALL_PARTITIONS record
let all_partitions_record = records
.iter()
- .find(|r| r.key_as_str() == Some("__all_partitions__"))
+ .find(|r| r.key_as_str() ==
Some(FilesPartitionRecord::ALL_PARTITIONS_KEY))
.expect("__all_partitions__ record not found");
let decoded = decode_files_partition_record(&reader,
all_partitions_record)
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 24dc4ce..b76837f 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -17,20 +17,20 @@
* under the License.
*/
-use std::collections::{HashMap, HashSet};
+use std::collections::HashMap;
use std::sync::Arc;
use crate::config::table::HudiTableConfig::BaseFileFormat;
use crate::config::HudiConfigs;
-use crate::file_group::FileGroup;
-use crate::metadata::table_record::FilesPartitionRecord;
-use crate::storage::Storage;
-use crate::timeline::completion_time::TimelineViewByCompletionTime;
-
use crate::file_group::builder::file_groups_from_files_partition_records;
use crate::file_group::file_slice::FileSlice;
+use crate::file_group::FileGroup;
+use crate::storage::Storage;
use crate::table::listing::FileLister;
use crate::table::partition::PartitionPruner;
+use crate::table::Table;
+use crate::timeline::completion_time::CompletionTimeView;
+use crate::timeline::view::TimelineView;
use crate::Result;
use dashmap::DashMap;
@@ -58,17 +58,12 @@ impl FileSystemView {
})
}
- /// Load file groups by listing from the file system.
+ /// Load file groups by listing from the storage.
///
/// # Arguments
/// * `partition_pruner` - Filters which partitions to include
- /// * `completion_time_view` - View to look up completion timestamps.
- /// - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for
all lookups)
- /// - For v8+ tables: Pass [`CompletionTimeView`] built from timeline
instants
- ///
- /// [`V6CompletionTimeView`]:
crate::timeline::completion_time::V6CompletionTimeView
- /// [`CompletionTimeView`]:
crate::timeline::completion_time::CompletionTimeView
- async fn load_file_groups_from_file_system<V: TimelineViewByCompletionTime
+ Sync>(
+ /// * `completion_time_view` - View to look up completion timestamps
+ async fn load_file_groups_from_storage<V: CompletionTimeView + Sync>(
&self,
partition_pruner: &PartitionPruner,
completion_time_view: &V,
@@ -90,34 +85,25 @@ impl FileSystemView {
/// Load file groups from metadata table records.
///
- /// This is an alternative to `load_file_groups_from_file_system` that
uses pre-fetched
- /// metadata table records instead of listing from storage. Only
partitions that pass the
- /// partition pruner will be loaded.
+ /// This is an alternative to `load_file_groups_from_file_system` that uses
+ /// file listing records fetched from the metadata table. Only partitions
that
+ /// pass the partition pruner will be loaded.
///
- /// This method is not async because it operates on pre-fetched records -
no I/O is needed.
- ///
- /// TODO: Consider making this async and fetching metadata table records
within this method
- /// instead of receiving pre-fetched records. This would make the API more
symmetric with
- /// `load_file_groups_from_file_system` and encapsulate the metadata table
fetching logic.
+ /// This method is not async because it operates on pre-fetched records.
///
/// # Arguments
- /// * `metadata_table_records` - Metadata table files partition records
+ /// * `records` - Metadata table files partition records
/// * `partition_pruner` - Filters which partitions to include
- /// * `completion_time_view` - View to look up completion timestamps.
- /// - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for
all lookups)
- /// - For v8+ tables: Pass [`CompletionTimeView`] built from timeline
instants
- ///
- /// [`V6CompletionTimeView`]:
crate::timeline::completion_time::V6CompletionTimeView
- /// [`CompletionTimeView`]:
crate::timeline::completion_time::CompletionTimeView
- fn load_file_groups_from_metadata_table<V: TimelineViewByCompletionTime>(
+ /// * `completion_time_view` - View to look up completion timestamps
+ fn load_file_groups_from_metadata_table_records<V: CompletionTimeView>(
&self,
- metadata_table_records: &HashMap<String, FilesPartitionRecord>,
+ records: &HashMap<String,
crate::metadata::table_record::FilesPartitionRecord>,
partition_pruner: &PartitionPruner,
completion_time_view: &V,
) -> Result<()> {
let base_file_format: String =
self.hudi_configs.get_or_default(BaseFileFormat).into();
let file_groups_map = file_groups_from_files_partition_records(
- metadata_table_records,
+ records,
&base_file_format,
completion_time_view,
)?;
@@ -132,15 +118,15 @@ impl FileSystemView {
Ok(())
}
- /// Collect file slices from loaded file groups as of a given timestamp.
- ///
- /// This is a private method that assumes file groups have already been
loaded.
- async fn collect_file_slices_as_of(
+ /// Collect file slices from loaded file groups using the timeline view.
+ async fn collect_file_slices(
&self,
- timestamp: &str,
partition_pruner: &PartitionPruner,
- excluding_file_groups: &HashSet<FileGroup>,
+ timeline_view: &TimelineView,
) -> Result<Vec<FileSlice>> {
+ let timestamp = timeline_view.as_of_timestamp();
+ let excluding_file_groups = timeline_view.excluding_file_groups();
+
let mut file_slices = Vec::new();
for mut partition_entry in self.partition_to_file_groups.iter_mut() {
if !partition_pruner.should_include(partition_entry.key()) {
@@ -160,44 +146,60 @@ impl FileSystemView {
Ok(file_slices)
}
- /// Get file slices as of a given timestamp.
+ /// Get file slices using a [`TimelineView`].
+ ///
+ /// This is the main API for retrieving file slices for snapshot or
time-travel queries.
+ /// It loads file groups from metadata table (if enabled) or storage
listing,
+ /// then select file slices based on the timeline view.
///
- /// This is the single entrypoint for retrieving file slices. It
internally decides
- /// whether to load file groups from metadata table records (if provided)
or from storage listing.
+ /// The [`TimelineView`] encapsulates:
+ /// - The "as of" timestamp for the query
+ /// - File groups to exclude (from replace commits for example)
+ /// - Completion time mappings (if needed)
///
/// # Arguments
- /// * `timestamp` - The timestamp to get file slices as of
/// * `partition_pruner` - Filters which partitions to include
- /// * `excluding_file_groups` - File groups to exclude (e.g., replaced
file groups)
- /// * `metadata_table_records` - Optional metadata table files partition
records for
- /// accelerated listing. If provided, file groups are loaded from these
records instead
- /// of storage listing.
- /// * `completion_time_view` - View to look up completion timestamps.
- /// - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for
all lookups)
- /// - For v8+ tables: Pass [`CompletionTimeView`] built from timeline
instants
- ///
- /// [`V6CompletionTimeView`]:
crate::timeline::completion_time::V6CompletionTimeView
- /// [`CompletionTimeView`]:
crate::timeline::completion_time::CompletionTimeView
- pub async fn get_file_slices_as_of<V: TimelineViewByCompletionTime + Sync>(
+ /// * `timeline_view` - The timeline view containing query context
+ /// * `metadata_table` - Optional metadata table instance
+ pub(crate) async fn get_file_slices(
&self,
- timestamp: &str,
partition_pruner: &PartitionPruner,
- excluding_file_groups: &HashSet<FileGroup>,
- metadata_table_records: Option<&HashMap<String, FilesPartitionRecord>>,
- completion_time_view: &V,
+ timeline_view: &TimelineView,
+ metadata_table: Option<&Table>,
) -> Result<Vec<FileSlice>> {
- // Load file groups from metadata table records if provided, otherwise
from file system
- if let Some(records) = metadata_table_records {
- self.load_file_groups_from_metadata_table(
- records,
+ if let Some(mdt) = metadata_table {
+ // Use metadata table for file listing
+ let records =
mdt.fetch_files_partition_records(partition_pruner).await?;
+ self.load_file_groups_from_metadata_table_records(
+ &records,
partition_pruner,
- completion_time_view,
+ timeline_view,
)?;
} else {
- self.load_file_groups_from_file_system(partition_pruner,
completion_time_view)
+ // Fall back to storage listing
+ self.load_file_groups_from_storage(partition_pruner, timeline_view)
.await?;
}
- self.collect_file_slices_as_of(timestamp, partition_pruner,
excluding_file_groups)
+ self.collect_file_slices(partition_pruner, timeline_view)
+ .await
+ }
+
+ /// Get file slices using storage listing only.
+ ///
+ /// This method always lists files from storage, which is needed
+ /// for metadata table's own file listing flow to avoid async recursion.
+ ///
+ /// # Arguments
+ /// * `partition_pruner` - Filters which partitions to include
+ /// * `timeline_view` - The timeline view containing query context
+ pub(crate) async fn get_file_slices_by_storage_listing(
+ &self,
+ partition_pruner: &PartitionPruner,
+ timeline_view: &TimelineView,
+ ) -> Result<Vec<FileSlice>> {
+ self.load_file_groups_from_storage(partition_pruner, timeline_view)
+ .await?;
+ self.collect_file_slices(partition_pruner, timeline_view)
.await
}
}
@@ -209,35 +211,28 @@ mod tests {
use crate::table::Table;
use hudi_test::SampleTable;
- use std::collections::HashSet;
#[tokio::test]
async fn fs_view_get_latest_file_slices() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
- let latest_timestamp = hudi_table
- .timeline
- .completed_commits
- .iter()
- .next_back()
- .map(|i| i.timestamp.clone())
- .unwrap();
+ let latest_timestamp =
hudi_table.timeline.get_latest_commit_timestamp().unwrap();
let fs_view = &hudi_table.file_system_view;
- let completion_time_view =
hudi_table.timeline.create_completion_time_view();
assert!(fs_view.partition_to_file_groups.is_empty());
+
+ let timeline_view = hudi_table
+ .timeline
+ .create_view_as_of(&latest_timestamp)
+ .await
+ .unwrap();
let partition_pruner = PartitionPruner::empty();
- let excludes = HashSet::new();
+
let file_slices = fs_view
- .get_file_slices_as_of(
- &latest_timestamp,
- &partition_pruner,
- &excludes,
- None, // mdt_records
- &completion_time_view,
- )
+ .get_file_slices(&partition_pruner, &timeline_view, None)
.await
.unwrap();
+
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
assert_eq!(file_slices.len(), 1);
let file_ids = file_slices
@@ -254,33 +249,23 @@ mod tests {
async fn fs_view_get_latest_file_slices_with_replace_commit() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
- let latest_timestamp = hudi_table
- .timeline
- .completed_commits
- .iter()
- .next_back()
- .map(|i| i.timestamp.clone())
- .unwrap();
+ let latest_timestamp =
hudi_table.timeline.get_latest_commit_timestamp().unwrap();
let fs_view = &hudi_table.file_system_view;
- let completion_time_view =
hudi_table.timeline.create_completion_time_view();
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
- let partition_pruner = PartitionPruner::empty();
- let excludes = &hudi_table
+
+ let timeline_view = hudi_table
.timeline
- .get_replaced_file_groups_as_of(&latest_timestamp)
+ .create_view_as_of(&latest_timestamp)
.await
.unwrap();
+ let partition_pruner = PartitionPruner::empty();
+
let file_slices = fs_view
- .get_file_slices_as_of(
- &latest_timestamp,
- &partition_pruner,
- excludes,
- None, // mdt_records
- &completion_time_view,
- )
+ .get_file_slices(&partition_pruner, &timeline_view, None)
.await
.unwrap();
+
assert_eq!(fs_view.partition_to_file_groups.len(), 3);
assert_eq!(file_slices.len(), 1);
let file_ids = file_slices
@@ -297,21 +282,14 @@ mod tests {
async fn fs_view_get_latest_file_slices_with_partition_filters() {
let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
- let latest_timestamp = hudi_table
- .timeline
- .completed_commits
- .iter()
- .next_back()
- .map(|i| i.timestamp.clone())
- .unwrap();
+ let latest_timestamp =
hudi_table.timeline.get_latest_commit_timestamp().unwrap();
let fs_view = &hudi_table.file_system_view;
- let completion_time_view =
hudi_table.timeline.create_completion_time_view();
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
- let excludes = &hudi_table
+ let timeline_view = hudi_table
.timeline
- .get_replaced_file_groups_as_of(&latest_timestamp)
+ .create_view_as_of(&latest_timestamp)
.await
.unwrap();
let partition_schema =
hudi_table.get_partition_schema().await.unwrap();
@@ -326,15 +304,10 @@ mod tests {
.unwrap();
let file_slices = fs_view
- .get_file_slices_as_of(
- &latest_timestamp,
- &partition_pruner,
- excludes,
- None, // mdt_records
- &completion_time_view,
- )
+ .get_file_slices(&partition_pruner, &timeline_view, None)
.await
.unwrap();
+
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
assert_eq!(file_slices.len(), 1);
diff --git a/crates/core/src/table/listing.rs b/crates/core/src/table/listing.rs
index c7b3404..ca9b25e 100644
--- a/crates/core/src/table/listing.rs
+++ b/crates/core/src/table/listing.rs
@@ -28,7 +28,7 @@ use crate::storage::{get_leaf_dirs, Storage};
use crate::table::partition::{
is_table_partitioned, PartitionPruner, EMPTY_PARTITION_PATH,
PARTITION_METAFIELD_PREFIX,
};
-use crate::timeline::completion_time::TimelineViewByCompletionTime;
+use crate::timeline::completion_time::CompletionTimeView;
use crate::Result;
use dashmap::DashMap;
use futures::{stream, StreamExt, TryStreamExt};
@@ -65,15 +65,10 @@ impl FileLister {
/// # Arguments
/// * `partition_path` - The partition path to list files from
/// * `completion_time_view` - View to look up completion timestamps.
- /// - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for
all lookups)
- /// - For v8+ tables: Pass [`CompletionTimeView`] built from timeline
instants
///
/// Files whose commit timestamps are not found in the completion time view
/// (i.e., uncommitted files) will have `completion_timestamp = None`.
- ///
- /// [`V6CompletionTimeView`]:
crate::timeline::completion_time::V6CompletionTimeView
- /// [`CompletionTimeView`]:
crate::timeline::completion_time::CompletionTimeView
- async fn list_file_groups_for_partition<V: TimelineViewByCompletionTime>(
+ async fn list_file_groups_for_partition<V: CompletionTimeView>(
&self,
partition_path: &str,
completion_time_view: &V,
@@ -101,7 +96,7 @@ impl FileLister {
if completion_time_view.should_filter_uncommitted()
&& base_file.completion_timestamp.is_none()
{
- // v8+ table: file is from an uncommitted commit, skip it
+ // file belongs to an uncommitted commit, skip it
continue;
}
@@ -118,7 +113,7 @@ impl FileLister {
if completion_time_view.should_filter_uncommitted()
&& log_file.completion_timestamp.is_none()
{
- // v8+ table: file is from an uncommitted commit,
skip it
+ // file belongs to an uncommitted commit, skip it
continue;
}
@@ -187,14 +182,7 @@ impl FileLister {
///
/// # Arguments
/// * `completion_time_view` - View to look up completion timestamps.
- /// - For v6 tables: Pass [`V6CompletionTimeView`] (returns `None` for
all lookups)
- /// - For v8+ tables: Pass [`CompletionTimeView`] built from timeline
instants
- ///
- /// [`V6CompletionTimeView`]:
crate::timeline::completion_time::V6CompletionTimeView
- /// [`CompletionTimeView`]:
crate::timeline::completion_time::CompletionTimeView
- pub async fn list_file_groups_for_relevant_partitions<
- V: TimelineViewByCompletionTime + Sync,
- >(
+ pub async fn list_file_groups_for_relevant_partitions<V:
CompletionTimeView + Sync>(
&self,
completion_time_view: &V,
) -> Result<DashMap<String, Vec<FileGroup>>> {
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 4ae088c..efa2110 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -94,19 +94,14 @@ pub mod partition;
mod validation;
use crate::config::read::HudiReadConfig;
-use crate::config::table::HudiTableConfig::{
- MetadataTableEnabled, MetadataTablePartitions, PartitionFields,
TableVersion,
-};
+use crate::config::table::HudiTableConfig::PartitionFields;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
-use crate::error::CoreError;
use crate::expr::filter::{from_str_tuples, Filter};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::reader::FileGroupReader;
-use crate::metadata::table_record::FilesPartitionRecord;
use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
use crate::schema::resolver::{resolve_avro_schema, resolve_schema};
-use crate::storage::util::join_url_segments;
use crate::table::builder::TableBuilder;
use crate::table::fs_view::FileSystemView;
use crate::table::partition::PartitionPruner;
@@ -219,108 +214,6 @@ impl Table {
self.table_type() == TableTypeValue::MergeOnRead.as_ref()
}
- /// Check if this table is a metadata table.
- ///
- /// Detection is based on the base path ending with `.hoodie/metadata`.
- pub fn is_metadata_table(&self) -> bool {
- let base_path: String = self
- .hudi_configs
- .get_or_default(HudiTableConfig::BasePath)
- .into();
- crate::util::path::is_metadata_table_path(&base_path)
- }
-
- /// Get the list of available metadata table partitions for this table.
- ///
- /// Returns the partitions configured in
`hoodie.table.metadata.partitions`.
- pub fn get_metadata_table_partitions(&self) -> Vec<String> {
- self.hudi_configs
- .get_or_default(MetadataTablePartitions)
- .into()
- }
-
- /// Check if the metadata table "files" partition is enabled for file
listing.
- ///
- /// Returns `true` if:
- /// 1. Table version is >= 8 (MDT support is only for v8+ tables), AND
- /// 2. Either:
- /// - `hoodie.metadata.enable` is explicitly true, OR
- /// - "files" is in the configured `hoodie.table.metadata.partitions`
- /// (implicit enablement for v8+ tables with configured partitions)
- ///
- /// This matches Hudi Java behavior where metadata table is considered
active
- /// when partitions are configured, even without explicit
`hoodie.metadata.enable=true`.
- pub fn is_metadata_table_enabled(&self) -> bool {
- // Check table version first - MDT is only supported for v8+ tables
- // TODO: drop v6 support then no need to check table version here
- let table_version: isize = self
- .hudi_configs
- .get(TableVersion)
- .map(|v| v.into())
- .unwrap_or(0);
-
- if table_version < 8 {
- return false;
- }
-
- // Check if "files" partition is configured
- let has_files_partition = self
- .get_metadata_table_partitions()
- .contains(&FilesPartitionRecord::PARTITION_NAME.to_string());
-
- // Explicit check for hoodie.metadata.enable
- let metadata_explicitly_enabled: bool = self
- .hudi_configs
- .get_or_default(MetadataTableEnabled)
- .into();
-
- // Enable if explicitly enabled OR if files partition is configured.
- // Note: For v8+ tables, having files partition configured implicitly
enables MDT
- // even if hoodie.metadata.enable is not set or is false. This is
because if the
- // files partition exists, MDT must have been enabled to populate it
(either by
- // Hudi writer or during table migration). This handles tables where
the explicit
- // config was not persisted but MDT was actively used.
- metadata_explicitly_enabled || has_files_partition
- }
-
- /// Create a metadata table instance for this data table.
- ///
- /// Uses all partitions from `hoodie.table.metadata.partitions`
configuration.
- ///
- /// # Errors
- ///
- /// Returns an error if the metadata table cannot be created or if there
are
- /// no metadata table partitions configured.
- pub async fn new_metadata_table(&self) -> Result<Table> {
- if self.is_metadata_table() {
- return Err(CoreError::MetadataTable(
- "Cannot create metadata table from another metadata
table".to_string(),
- ));
- }
-
- let mdt_partitions = self.get_metadata_table_partitions();
- if mdt_partitions.is_empty() {
- return Err(CoreError::MetadataTable(
- "No metadata table partitions configured".to_string(),
- ));
- }
-
- let mdt_url = join_url_segments(&self.base_url(), &[".hoodie",
"metadata"])?;
- Table::new_with_options(
- mdt_url.as_str(),
- [(PartitionFields.as_ref(), METADATA_TABLE_PARTITION_FIELD)],
- )
- .await
- }
-
- /// Same as [Table::new_metadata_table], but blocking.
- pub fn new_metadata_table_blocking(&self) -> Result<Table> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.new_metadata_table().await })
- }
-
pub fn timezone(&self) -> String {
self.hudi_configs
.get_or_default(HudiTableConfig::TimelineTimezone)
@@ -370,7 +263,7 @@ impl Table {
/// Get the latest partition [arrow_schema::Schema] of the table.
///
/// For metadata tables, returns a schema with a single `partition` field
- /// typed as [arrow_schema::DataType::Utf8], since MDT uses a single
partition
+ /// typed as [arrow_schema::DataType::Utf8], since metadata tables use a
single partition
/// column to identify partitions like "files", "column_stats", etc.
///
/// For regular tables, returns the partition fields with their actual
data types
@@ -579,35 +472,20 @@ impl Table {
timestamp: &str,
filters: &[Filter],
) -> Result<Vec<FileSlice>> {
- // Create completion time view for setting completion timestamps on
files.
- // For v8+ tables, this populates completion timestamps and enables
filtering.
- // For v6 tables, the view is empty and acts as a no-op.
- let completion_time_view = self.timeline.create_completion_time_view();
+ let timeline_view = self.timeline.create_view_as_of(timestamp).await?;
- // File slices are keyed by commit_timestamp (request/base instant
time).
- let excludes = self
- .timeline
- .get_replaced_file_groups_as_of(timestamp)
- .await?;
let partition_schema = self.get_partition_schema().await?;
let partition_pruner =
PartitionPruner::new(filters, &partition_schema,
self.hudi_configs.as_ref())?;
- // Try metadata table accelerated listing if files partition is
configured
- let metadata_table_records = if self.is_metadata_table_enabled() {
+ // Try to create metadata table instance if enabled
+ let metadata_table = if self.is_metadata_table_enabled() {
log::debug!("Using metadata table for file listing");
- match self.fetch_metadata_table_records().await {
- Ok(records) => {
- log::debug!(
- "Successfully read {} partition records from metadata
table",
- records.len()
- );
- Some(records)
- }
+ match self.new_metadata_table().await {
+ Ok(mdt) => Some(mdt),
Err(e) => {
- // Fall through to storage listing if metadata table read
fails
log::warn!(
- "Failed to read file slices from metadata table,
falling back to storage listing: {}",
+ "Failed to create metadata table, falling back to
storage listing: {}",
e
);
None
@@ -617,28 +495,11 @@ impl Table {
None
};
- // Use the single entrypoint that handles both metadata table and
storage listing
self.file_system_view
- .get_file_slices_as_of(
- timestamp,
- &partition_pruner,
- &excludes,
- metadata_table_records.as_ref(),
- &completion_time_view,
- )
+ .get_file_slices(&partition_pruner, &timeline_view,
metadata_table.as_ref())
.await
}
- /// Fetch file records from the metadata table.
- ///
- /// The metadata table returns records as-of its current state. For time
travel
- /// or incremental queries, the timestamp filtering is handled by the
caller
- /// using completion time views - the metadata table just provides the
file listing.
- async fn fetch_metadata_table_records(&self) -> Result<HashMap<String,
FilesPartitionRecord>> {
- let metadata_table = self.new_metadata_table().await?;
- metadata_table.read_metadata_files().await
- }
-
/// Get all the changed [FileSlice]s in the table between the given
timestamps.
///
/// # Arguments
@@ -782,78 +643,6 @@ impl Table {
)
}
- /// Read records from the "files" partition of the metadata table.
- ///
- /// This method can only be called on metadata tables. It reads all records
- /// from the "files" partition and returns merged `FilesPartitionRecord`s.
- ///
- /// # Returns
- /// A HashMap mapping record keys to their `FilesPartitionRecord`s.
- pub async fn read_metadata_files(&self) -> Result<HashMap<String,
FilesPartitionRecord>> {
- if !self.is_metadata_table() {
- return Err(CoreError::MetadataTable(
- "read_metadata_files can only be called on metadata
tables".to_string(),
- ));
- }
-
- // Use completion timestamp for file slice queries (v8+ tables key by
completion time)
- let Some(timestamp) =
self.timeline.get_latest_commit_timestamp_as_option() else {
- return Ok(HashMap::new());
- };
-
- // For MDT, always use storage-based listing (not MDT-accelerated)
- // to avoid recursion. MDT never uses itself for file listing.
- let excludes = self
- .timeline
- .get_replaced_file_groups_as_of(timestamp)
- .await?;
- let filters = from_str_tuples([(
- METADATA_TABLE_PARTITION_FIELD,
- "=",
- FilesPartitionRecord::PARTITION_NAME,
- )])?;
- let partition_schema = self.get_partition_schema().await?;
- let partition_pruner =
- PartitionPruner::new(&filters, &partition_schema,
self.hudi_configs.as_ref())?;
- let completion_time_view = self.timeline.create_completion_time_view();
- let file_slices = self
- .file_system_view
- .get_file_slices_as_of(
- timestamp,
- &partition_pruner,
- &excludes,
- None, // Never use MDT for reading MDT itself (avoid recursion)
- &completion_time_view,
- )
- .await?;
-
- if file_slices.len() != 1 {
- return Err(CoreError::MetadataTable(format!(
- "Expected 1 file slice for {} partition, got {}",
- FilesPartitionRecord::PARTITION_NAME,
- file_slices.len()
- )));
- }
- let file_slice = &file_slices[0];
-
- let fg_reader = self.create_file_group_reader_with_options([(
- HudiReadConfig::FileGroupEndTimestamp,
- timestamp,
- )])?;
-
- fg_reader
- .read_file_slice_from_metadata_table(file_slice)
- .await
- }
-
- /// Same as [Table::read_metadata_files], but blocking.
- pub fn read_metadata_files_blocking(&self) -> Result<HashMap<String,
FilesPartitionRecord>> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(self.read_metadata_files())
- }
-
/// Get all the latest records in the table.
///
/// # Arguments
@@ -949,23 +738,24 @@ impl Table {
) -> Result<Vec<RecordBatch>> {
// If the end timestamp is not provided, use the latest file slice
timestamp.
// This is the request timestamp (commit_timestamp) for both v6 and
v8+ tables.
- let Some(end_timestamp) =
+ let Some(end_ts) =
end_timestamp.or_else(||
self.timeline.get_latest_commit_timestamp_as_option())
else {
return Ok(Vec::new());
};
let timezone = self.timezone();
- let start_timestamp = format_timestamp(start_timestamp, &timezone)?;
- let end_timestamp = format_timestamp(end_timestamp, &timezone)?;
+ let start_ts = format_timestamp(start_timestamp, &timezone)?;
+ let end_ts = format_timestamp(end_ts, &timezone)?;
+ // Use incremental API that reads from timeline commit metadata
let file_slices = self
- .get_file_slices_between_internal(&start_timestamp, &end_timestamp)
+ .get_file_slices_between_internal(&start_ts, &end_ts)
.await?;
let fg_reader = self.create_file_group_reader_with_options([
- (HudiReadConfig::FileGroupStartTimestamp, start_timestamp),
- (HudiReadConfig::FileGroupEndTimestamp, end_timestamp),
+ (HudiReadConfig::FileGroupStartTimestamp, start_ts.clone()),
+ (HudiReadConfig::FileGroupEndTimestamp, end_ts),
])?;
let batches =
@@ -1016,9 +806,9 @@ mod tests {
use crate::config::HUDI_CONF_DIR;
use crate::error::CoreError;
use crate::metadata::meta_field::MetaField;
- use crate::metadata::table_record::MetadataRecordType;
use crate::storage::util::join_url_segments;
use crate::storage::Storage;
+ use crate::timeline::EARLIEST_START_TIMESTAMP;
use hudi_test::{assert_arrow_field_names_eq, assert_avro_field_names_eq,
SampleTable};
use std::collections::HashSet;
use std::fs::canonicalize;
@@ -1689,190 +1479,4 @@ mod tests {
let expected = HashSet::new();
assert_eq!(actual, expected);
}
-
- //
=========================================================================
- // Metadata Table Tests
- //
=========================================================================
-
- fn get_data_table() -> Table {
- use hudi_test::QuickstartTripsTable;
- let table_path =
QuickstartTripsTable::V8Trips8I3U1D.path_to_mor_avro();
- Table::new_blocking(&table_path).unwrap()
- }
-
- #[test]
- fn hudi_table_read_metadata_files() {
- let data_table = get_data_table();
- let metadata_table = data_table.new_metadata_table_blocking().unwrap();
-
- assert!(metadata_table.is_metadata_table());
-
- let records = metadata_table.read_metadata_files_blocking().unwrap();
-
- // Should have 4 records: __all_partitions__ + 3 city partitions
- assert_eq!(records.len(), 4);
-
- // Validate __all_partitions__ record
- let all_partitions = records.get("__all_partitions__").unwrap();
- assert_eq!(
- all_partitions.record_type,
- MetadataRecordType::AllPartitions
- );
- let partition_names: HashSet<_> =
all_partitions.partition_names().into_iter().collect();
- assert_eq!(
- partition_names,
- HashSet::from(["city=chennai", "city=san_francisco",
"city=sao_paulo"])
- );
-
- // Validate city=chennai record with actual file names
- let chennai = records.get("city=chennai").unwrap();
- assert_eq!(chennai.record_type, MetadataRecordType::Files);
- let chennai_files: HashSet<_> =
chennai.active_file_names().into_iter().collect();
- assert_eq!(
- chennai_files,
- HashSet::from([
-
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_2-986-2794_20251220210108078.parquet",
-
"6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_0-1112-3190_20251220210129235.parquet",
-
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210127080.log.1_0-1072-3078",
-
".6e1d5cc4-c487-487d-abbe-fe9b30b1c0cc-0_20251220210128625.log.1_0-1097-3150",
- ])
- );
- assert!(chennai.total_size() > 0);
- }
-
- #[test]
- fn hudi_table_get_metadata_table_partitions() {
- let data_table = get_data_table();
-
- // Verify we can get the MDT partitions from the data table
- let partitions = data_table.get_metadata_table_partitions();
-
- // The test table has 5 MDT partitions configured
- assert_eq!(
- partitions.len(),
- 5,
- "Should have 5 MDT partitions, got: {:?}",
- partitions
- );
-
- // Verify all expected partitions are present
- let expected = [
- "column_stats",
- "files",
- "partition_stats",
- "record_index",
- "secondary_index_rider_idx",
- ];
- for partition in &expected {
- assert!(
- partitions.contains(&partition.to_string()),
- "Should contain '{}' partition, got: {:?}",
- partition,
- partitions
- );
- }
- }
-
- #[test]
- fn hudi_table_is_metadata_table_enabled() {
- // V8 table with files partition configured should enable MDT
- // even without explicit hoodie.metadata.enable=true
- let data_table = get_data_table();
-
- // Verify it's a v8 table
- let table_version: isize = data_table
- .hudi_configs
- .get(TableVersion)
- .map(|v| v.into())
- .unwrap_or(0);
- assert_eq!(table_version, 8, "Test table should be v8");
-
- // Verify files partition is configured
- let partitions = data_table.get_metadata_table_partitions();
- assert!(
- partitions.contains(&"files".to_string()),
- "Should have 'files' partition configured"
- );
-
- // Verify is_metadata_table_enabled returns true (implicit enablement)
- assert!(
- data_table.is_metadata_table_enabled(),
- "is_metadata_table_enabled should return true for v8 table with
files partition"
- );
- }
-
- #[test]
- fn hudi_table_v6_metadata_table_not_enabled() {
- // V6 tables should NOT have MDT enabled, even with explicit setting
- let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
-
- // Verify it's a v6 table
- let table_version: isize = hudi_table
- .hudi_configs
- .get(TableVersion)
- .map(|v| v.into())
- .unwrap_or(0);
- assert_eq!(table_version, 6, "Test table should be v6");
-
- // V6 tables should not have MDT enabled
- assert!(
- !hudi_table.is_metadata_table_enabled(),
- "is_metadata_table_enabled should return false for v6 table"
- );
- }
-
- #[test]
- fn hudi_table_is_not_metadata_table() {
- // A regular data table should not be a metadata table
- let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
- assert!(
- !hudi_table.is_metadata_table(),
- "Regular data table should not be a metadata table"
- );
- }
-
- #[test]
- fn hudi_table_metadata_table_is_metadata_table() {
- // Create a metadata table and verify it's recognized as such
- let data_table = get_data_table();
- let metadata_table = data_table.new_metadata_table_blocking().unwrap();
- assert!(
- metadata_table.is_metadata_table(),
- "Metadata table should be recognized as a metadata table"
- );
- }
-
- #[test]
- fn hudi_table_new_metadata_table_from_metadata_table_errors() {
- // Trying to create a metadata table from a metadata table should fail
- let data_table = get_data_table();
- let metadata_table = data_table.new_metadata_table_blocking().unwrap();
-
- let result = metadata_table.new_metadata_table_blocking();
- assert!(result.is_err());
- let err = result.unwrap_err();
- assert!(
- err.to_string()
- .contains("Cannot create metadata table from another metadata
table"),
- "Error message should indicate cannot create from metadata table"
- );
- }
-
- #[tokio::test]
- async fn hudi_table_read_metadata_files_on_non_metadata_table_errors() {
- // Calling read_metadata_files on a non-metadata table should fail
- let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new(base_url.path()).await.unwrap();
-
- let result = hudi_table.read_metadata_files().await;
- assert!(result.is_err());
- let err = result.unwrap_err();
- assert!(
- err.to_string()
- .contains("can only be called on metadata tables"),
- "Error message should indicate read_metadata_files requires
metadata table"
- );
- }
}
diff --git a/crates/core/src/timeline/completion_time.rs
b/crates/core/src/timeline/completion_time.rs
index c6cd4dc..f68630c 100644
--- a/crates/core/src/timeline/completion_time.rs
+++ b/crates/core/src/timeline/completion_time.rs
@@ -19,201 +19,39 @@
//! Completion time query view for looking up completion timestamps from
request timestamps.
//!
-//! This module provides the [`TimelineViewByCompletionTime`] trait and
implementations
-//! for mapping request timestamps to completion timestamps. This is essential
for
-//! v8+ tables where file names contain request timestamps but file ordering
and
-//! association must be based on completion timestamps.
+//! This module provides the [`CompletionTimeView`] trait for mapping request
timestamps
+//! to completion timestamps. This is essential for timeline layout v2 where
file names contain
+//! request timestamps but file ordering and association must be based on
completion timestamps.
//!
-//! # Table Version Differences
+//! # Timeline Layout Differences
//!
-//! - **v6 tables**: Completion timestamps are not tracked. Use
[`V6CompletionTimeView`]
-//! which returns `None` for all lookups (completion_timestamp remains
unset).
+//! - **Timeline layout v1**: Completion timestamps are not tracked. The
completion time map is empty
+//! and `get_completion_time()` always returns `None`.
//!
-//! - **v8+ tables**: Request and completion timestamps are different. The
completion timestamp
+//! - **Timeline layout v2**: Completion timestamps are tracked in completed
instants. The completion timestamp
//! is stored in the timeline instant filename as
`{request}_{completion}.{action}`.
-//! Use [`CompletionTimeView`] to look up completion timestamps from a map.
-
-use crate::timeline::instant::Instant;
-use std::collections::HashMap;
+//! The completion time map is populated from timeline instants.
+//!
+//! # Implementation
+//!
+//! [`TimelineView`] is the main implementation of this trait. It checks
+//! [`HudiTableConfig::TimelineLayoutVersion`] to determine whether to build
the completion time map.
+//!
+//! [`TimelineView`]: crate::timeline::view::TimelineView
+//! [`HudiTableConfig::TimelineLayoutVersion`]:
crate::config::table::HudiTableConfig::TimelineLayoutVersion
/// A view for querying completion timestamps from request timestamps.
///
/// This trait abstracts the completion time lookup logic to support both
-/// v6 tables (where completion time is not tracked) and v8+ tables (where
-/// request and completion timestamps differ).
-pub trait TimelineViewByCompletionTime {
+/// V1 and V2 timeline layouts.
+pub trait CompletionTimeView {
/// Get the completion timestamp for a given request timestamp.
///
/// Returns `Some(completion_timestamp)` if the request timestamp
corresponds
/// to a completed commit, `None` if the commit is pending/unknown or if
- /// completion time is not tracked (v6 tables).
+ /// completion time is not tracked.
fn get_completion_time(&self, request_timestamp: &str) -> Option<&str>;
/// Returns true if uncommitted files should be filtered out.
- ///
- /// For v8+ tables, this returns true because we track completion times
- /// and can distinguish committed from uncommitted files.
- /// For v6 tables, this returns false because completion time is not
tracked.
- fn should_filter_uncommitted(&self) -> bool {
- false
- }
-}
-
-/// Completion time view for v6 tables.
-///
-/// In v6, completion time is not tracked separately from request time.
-/// This view always returns `None`, meaning the caller should use the
-/// request timestamp directly for any completion-time-based logic.
-#[derive(Debug, Default)]
-pub struct V6CompletionTimeView;
-
-impl V6CompletionTimeView {
- pub fn new() -> Self {
- Self
- }
-}
-
-impl TimelineViewByCompletionTime for V6CompletionTimeView {
- fn get_completion_time(&self, _request_timestamp: &str) -> Option<&str> {
- None
- }
-}
-
-/// Completion time view for v8+ tables backed by a HashMap.
-///
-/// This view is built from timeline instants and maps request timestamps
-/// to their corresponding completion timestamps.
-#[derive(Debug)]
-pub struct CompletionTimeView {
- /// Map from request timestamp to completion timestamp
- request_to_completion: HashMap<String, String>,
-}
-
-impl CompletionTimeView {
- /// Create a new completion time view from timeline instants.
- ///
- /// Only completed instants with a completion timestamp are included.
- pub fn from_instants<'a, I>(instants: I) -> Self
- where
- I: IntoIterator<Item = &'a Instant>,
- {
- let request_to_completion = instants
- .into_iter()
- .filter_map(|instant| {
- instant
- .completion_timestamp
- .as_ref()
- .map(|completion_ts| (instant.timestamp.clone(),
completion_ts.clone()))
- })
- .collect();
-
- Self {
- request_to_completion,
- }
- }
-
- /// Create a new empty completion time view.
- pub fn empty() -> Self {
- Self {
- request_to_completion: HashMap::new(),
- }
- }
-
- /// Check if this view has any completion time mappings.
- pub fn is_empty(&self) -> bool {
- self.request_to_completion.is_empty()
- }
-
- /// Get the number of completion time mappings.
- pub fn len(&self) -> usize {
- self.request_to_completion.len()
- }
-}
-
-impl TimelineViewByCompletionTime for CompletionTimeView {
- fn get_completion_time(&self, request_timestamp: &str) -> Option<&str> {
- self.request_to_completion
- .get(request_timestamp)
- .map(|s| s.as_str())
- }
-
- fn should_filter_uncommitted(&self) -> bool {
- // Only filter if we have completion time data (v8+ table)
- !self.is_empty()
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::timeline::instant::{Action, State};
-
- fn create_instant(request_ts: &str, completion_ts: Option<&str>) ->
Instant {
- Instant {
- timestamp: request_ts.to_string(),
- completion_timestamp: completion_ts.map(|s| s.to_string()),
- action: Action::Commit,
- state: State::Completed,
- epoch_millis: 0,
- }
- }
-
- #[test]
- fn test_v6_completion_time_view() {
- let view = V6CompletionTimeView::new();
-
- // V6 view always returns None for completion time
- // (caller should use request time directly for v6 tables)
- assert!(view.get_completion_time("20240101120000000").is_none());
- assert!(view.get_completion_time("any_timestamp").is_none());
- }
-
- #[test]
- fn test_timeline_completion_time_view_from_instants() {
- let instants = vec![
- create_instant("20240101120000000", Some("20240101120005000")),
- create_instant("20240101130000000", Some("20240101130010000")),
- create_instant("20240101140000000", None), // Pending instant
- ];
-
- let view = CompletionTimeView::from_instants(&instants);
-
- // Completed instants should have completion times
- assert_eq!(
- view.get_completion_time("20240101120000000"),
- Some("20240101120005000")
- );
- assert_eq!(
- view.get_completion_time("20240101130000000"),
- Some("20240101130010000")
- );
-
- // Pending instant should not have completion time
- assert!(view.get_completion_time("20240101140000000").is_none());
-
- // Unknown timestamp should return None
- assert!(view.get_completion_time("unknown").is_none());
- }
-
- #[test]
- fn test_timeline_completion_time_view_empty() {
- let view = CompletionTimeView::empty();
-
- assert!(view.is_empty());
- assert_eq!(view.len(), 0);
- assert!(view.get_completion_time("any").is_none());
- }
-
- #[test]
- fn test_timeline_completion_time_view_len() {
- let instants = vec![
- create_instant("20240101120000000", Some("20240101120005000")),
- create_instant("20240101130000000", Some("20240101130010000")),
- ];
-
- let view = CompletionTimeView::from_instants(&instants);
-
- assert!(!view.is_empty());
- assert_eq!(view.len(), 2);
- }
+ fn should_filter_uncommitted(&self) -> bool;
}
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 20d8b5a..9f7f8af 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -23,22 +23,21 @@ pub mod loader;
pub mod lsm_tree;
pub(crate) mod selector;
pub(crate) mod util;
+pub mod view;
use crate::config::HudiConfigs;
use crate::error::CoreError;
-use crate::file_group::builder::{
- file_groups_from_commit_metadata,
replaced_file_groups_from_replace_commit, FileGroupMerger,
-};
+use crate::file_group::builder::replaced_file_groups_from_replace_commit;
use crate::file_group::FileGroup;
use crate::schema::resolver::{
resolve_avro_schema_from_commit_metadata,
resolve_schema_from_commit_metadata,
};
use crate::storage::Storage;
use crate::timeline::builder::TimelineBuilder;
-use crate::timeline::completion_time::CompletionTimeView;
use crate::timeline::instant::Action;
use crate::timeline::loader::TimelineLoader;
use crate::timeline::selector::TimelineSelector;
+use crate::timeline::view::TimelineView;
use crate::Result;
use arrow_schema::Schema;
use instant::Instant;
@@ -249,16 +248,16 @@ impl Timeline {
.map_or_else(|| Err(CoreError::TimelineNoCommit), |t|
Ok(t.to_string()))
}
- /// Create a [CompletionTimeView] from the completed commits.
- ///
- /// This view maps request timestamps to completion timestamps, enabling
- /// correct file association for v8+ tables where request and completion
- /// timestamps differ.
- ///
- /// For v6 tables, the view will be empty since completion_timestamp is
None
- /// for all instants, and the caller should use the request timestamp
directly.
- pub fn create_completion_time_view(&self) -> CompletionTimeView {
- CompletionTimeView::from_instants(&self.completed_commits)
+ /// Create a [TimelineView] as of the given timestamp.
+ pub async fn create_view_as_of(&self, timestamp: &str) ->
Result<TimelineView> {
+ let excludes = self.get_replaced_file_groups_as_of(timestamp).await?;
+ Ok(TimelineView::new(
+ timestamp.to_string(),
+ None,
+ &self.completed_commits,
+ excludes,
+ &self.hudi_configs,
+ ))
}
/// Get the latest [apache_avro::schema::Schema] as [String] from the
[Timeline].
@@ -301,21 +300,28 @@ impl Timeline {
Ok(file_groups)
}
- /// Get file groups in the timeline ranging from start (exclusive) to end
(inclusive).
- /// File groups are as of the [end] timestamp or the latest if not given.
+ /// Get file groups from commit metadata for commits in a time range.
+ ///
+ /// This is used for incremental queries where we only want file groups
+ /// that were modified in the time range (start, end].
///
- /// For v8+ tables, the completion timestamps from the timeline instants
are used
- /// to set the completion_timestamp on base files and log files, enabling
correct
- /// file slice association based on completion order rather than request
order.
+ /// # Arguments
+ /// * `start_timestamp` - Start of the time range (exclusive), None means
no lower bound
+ /// * `end_timestamp` - End of the time range (inclusive), None means no
upper bound
///
- /// For v6 tables, completion_timestamp is None (v6 does not track
completion times).
+ /// # Returns
+ /// File groups that were modified in the time range, excluding replaced
file groups.
pub(crate) async fn get_file_groups_between(
&self,
start_timestamp: Option<&str>,
end_timestamp: Option<&str>,
) -> Result<HashSet<FileGroup>> {
- let mut file_groups: HashSet<FileGroup> = HashSet::new();
- let mut replaced_file_groups: HashSet<FileGroup> = HashSet::new();
+ use crate::file_group::builder::{
+ file_groups_from_commit_metadata,
replaced_file_groups_from_replace_commit,
+ FileGroupMerger,
+ };
+
+ // Get commits in the time range (start, end]
let selector = TimelineSelector::completed_actions_in_range(
DEFAULT_LOADING_ACTIONS,
self.hudi_configs.clone(),
@@ -323,11 +329,21 @@ impl Timeline {
end_timestamp,
)?;
let commits = selector.select(self)?;
+ if commits.is_empty() {
+ return Ok(HashSet::new());
+ }
- // Build completion time view from all commits for v8+ tables.
- // Each file (base or log) may have a different completion timestamp,
- // looked up from its own request timestamp.
- let completion_time_view = CompletionTimeView::from_instants(&commits);
+ // Build completion time view from selected commits only.
+ let completion_time_view = TimelineView::new(
+ commits.last().unwrap().timestamp.clone(),
+ Some(commits.first().unwrap().timestamp.clone()),
+ &commits,
+ HashSet::new(),
+ &self.hudi_configs,
+ );
+
+ let mut file_groups: HashSet<FileGroup> = HashSet::new();
+ let mut replaced_file_groups: HashSet<FileGroup> = HashSet::new();
for commit in commits {
let commit_metadata = self.get_instant_metadata(&commit).await?;
diff --git a/crates/core/src/timeline/view.rs b/crates/core/src/timeline/view.rs
new file mode 100644
index 0000000..9aa2b45
--- /dev/null
+++ b/crates/core/src/timeline/view.rs
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Timeline view for filtering file slices.
+//!
+//! This module provides the [`TimelineView`] struct which encapsulates all
+//! timeline-derived context needed for file slice queries:
+//! - Query timestamp (as of)
+//! - Start timestamp (for incremental queries)
+//! - Completion time mappings
+//! - File groups to be excluded (e.g., replaced by clustering)
+//!
+//! [`TimelineView`] implements [`CompletionTimeView`] trait and is the main
+//! type used for completion time lookups throughout the codebase.
+
+use crate::config::table::HudiTableConfig::TimelineLayoutVersion;
+use crate::config::HudiConfigs;
+use crate::file_group::FileGroup;
+use crate::timeline::completion_time::CompletionTimeView;
+use crate::timeline::instant::Instant;
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+/// Timeline view for filtering file slices.
+///
+/// See module-level documentation for details.
+#[derive(Debug)]
+pub struct TimelineView {
+ /// The "as of" timestamp for the snapshot query.
+ /// It is also the end timestamp for incremental queries.
+ as_of_timestamp: String,
+
+ /// The start timestamp when the view is used for incremental queries.
+ #[allow(dead_code)]
+ start_timestamp: Option<String>,
+
+ /// File groups to exclude from the query result.
+ ///
+ /// These are file groups that have been replaced by clustering
+ /// or insert overwrite operations before the query timestamp.
+ excluding_file_groups: HashSet<FileGroup>,
+
+ /// Map from request timestamp to completion timestamp.
+ ///
+ /// Populated for timeline layout v2. Empty for v1.
+ request_to_completion: HashMap<String, String>,
+
+ /// Whether this table uses timeline layout v2 (completion time tracking).
+ is_timeline_layout_v2: bool,
+}
+
+impl TimelineView {
+ /// Create a new timeline view.
+ ///
+ /// # Arguments
+ /// * `as_of_timestamp` - The "as of" timestamp for the snapshot and
time-travel query; also the end timestamp for incremental queries
+ /// * `start_timestamp` - The start timestamp for incremental queries
+ /// * `completed_commits` - Iterator over completed commit instants to
build the view from
+ /// * `excluding_file_groups` - File groups to exclude (e.g., replaced by
clustering)
+ /// * `hudi_configs` - The shared Hudi configurations
+ pub fn new<'a, I>(
+ as_of_timestamp: String,
+ start_timestamp: Option<String>,
+ completed_commits: I,
+ excluding_file_groups: HashSet<FileGroup>,
+ hudi_configs: &Arc<HudiConfigs>,
+ ) -> Self
+ where
+ I: IntoIterator<Item = &'a Instant>,
+ {
+ // Only build completion time map for timeline layout v2
+ let timeline_layout_version: isize = hudi_configs
+ .get(TimelineLayoutVersion)
+ .map(|v| v.into())
+ .unwrap_or(0);
+
+ let is_timeline_layout_v2 = timeline_layout_version >= 2;
+ let request_to_completion = if is_timeline_layout_v2 {
+ Self::build_completion_time_map(completed_commits)
+ } else {
+ HashMap::new()
+ };
+
+ Self {
+ as_of_timestamp,
+ start_timestamp,
+ excluding_file_groups,
+ request_to_completion,
+ is_timeline_layout_v2,
+ }
+ }
+
+ /// Build the completion time map from instants.
+ fn build_completion_time_map<'a, I>(instants: I) -> HashMap<String, String>
+ where
+ I: IntoIterator<Item = &'a Instant>,
+ {
+ instants
+ .into_iter()
+ .filter_map(|instant| {
+ instant
+ .completion_timestamp
+ .as_ref()
+ .map(|completion_ts| (instant.timestamp.clone(),
completion_ts.clone()))
+ })
+ .collect()
+ }
+
+ /// Get the "as of" timestamp for this view.
+ #[inline]
+ pub fn as_of_timestamp(&self) -> &str {
+ &self.as_of_timestamp
+ }
+
+ /// Get the file groups to exclude from the query.
+ #[inline]
+ pub fn excluding_file_groups(&self) -> &HashSet<FileGroup> {
+ &self.excluding_file_groups
+ }
+}
+
+impl CompletionTimeView for TimelineView {
+ fn get_completion_time(&self, request_timestamp: &str) -> Option<&str> {
+ self.request_to_completion
+ .get(request_timestamp)
+ .map(|s| s.as_str())
+ }
+
+ fn should_filter_uncommitted(&self) -> bool {
+ self.is_timeline_layout_v2
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::config::HudiConfigs;
+ use crate::timeline::instant::{Action, State};
+ fn create_instant(request_ts: &str, completion_ts: Option<&str>) ->
Instant {
+ Instant {
+ timestamp: request_ts.to_string(),
+ completion_timestamp: completion_ts.map(|s| s.to_string()),
+ action: Action::Commit,
+ state: State::Completed,
+ epoch_millis: 0,
+ }
+ }
+
+ fn create_layout_v1_configs() -> Arc<HudiConfigs> {
+ Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "1")]))
+ }
+
+ fn create_layout_v2_configs() -> Arc<HudiConfigs> {
+ Arc::new(HudiConfigs::new([("hoodie.timeline.layout.version", "2")]))
+ }
+
+ #[test]
+ fn test_snapshot_view_creation_layout_v2() {
+ let instants = vec![
+ create_instant("20240101120000000", Some("20240101120005000")),
+ create_instant("20240101130000000", Some("20240101130010000")),
+ ];
+ let configs = create_layout_v2_configs();
+
+ let view = TimelineView::new(
+ "20240101130000000".to_string(),
+ None,
+ &instants,
+ HashSet::new(),
+ &configs,
+ );
+
+ assert_eq!(view.as_of_timestamp(), "20240101130000000");
+ assert!(view.excluding_file_groups().is_empty());
+ // Layout v2 should have completion time map populated
+ assert!(view.should_filter_uncommitted());
+ }
+
+ #[test]
+ fn test_snapshot_view_creation_layout_v1() {
+ let instants = vec![
+ create_instant("20240101120000000", Some("20240101120005000")),
+ create_instant("20240101130000000", Some("20240101130010000")),
+ ];
+ let configs = create_layout_v1_configs();
+
+ let view = TimelineView::new(
+ "20240101130000000".to_string(),
+ None,
+ &instants,
+ HashSet::new(),
+ &configs,
+ );
+
+ assert_eq!(view.as_of_timestamp(), "20240101130000000");
+ // Layout v1 should NOT have completion time map (empty)
+ assert!(!view.should_filter_uncommitted());
+ assert!(view.get_completion_time("20240101120000000").is_none());
+ }
+
+ #[test]
+ fn test_completion_time_lookup_layout_v2() {
+ let instants = vec![
+ create_instant("20240101120000000", Some("20240101120005000")),
+ create_instant("20240101130000000", Some("20240101130010000")),
+ create_instant("20240101140000000", None), // Pending
+ ];
+ let configs = create_layout_v2_configs();
+
+ let view = TimelineView::new(
+ "20240101140000000".to_string(),
+ None,
+ &instants,
+ HashSet::new(),
+ &configs,
+ );
+
+ // Completed instants have completion time
+ assert_eq!(
+ view.get_completion_time("20240101120000000"),
+ Some("20240101120005000")
+ );
+ assert_eq!(
+ view.get_completion_time("20240101130000000"),
+ Some("20240101130010000")
+ );
+
+ // Pending instant has no completion time
+ assert!(view.get_completion_time("20240101140000000").is_none());
+
+ // Unknown timestamp returns None
+ assert!(view.get_completion_time("unknown").is_none());
+ }
+
+ #[test]
+ fn test_should_filter_uncommitted_layout_v2() {
+ let instants = vec![create_instant(
+ "20240101120000000",
+ Some("20240101120005000"),
+ )];
+ let configs = create_layout_v2_configs();
+
+ let view = TimelineView::new(
+ "20240101120000000".to_string(),
+ None,
+ &instants,
+ HashSet::new(),
+ &configs,
+ );
+
+ // Layout v2 should filter uncommitted
+ assert!(view.should_filter_uncommitted());
+ }
+
+ #[test]
+ fn test_should_not_filter_uncommitted_layout_v1() {
+ // Layout v1 - even with instants that have completion timestamps,
+ // the map is not built, so should_filter_uncommitted returns false
+ let instants = vec![create_instant(
+ "20240101120000000",
+ Some("20240101120005000"),
+ )];
+ let configs = create_layout_v1_configs();
+
+ let view = TimelineView::new(
+ "20240101120000000".to_string(),
+ None,
+ &instants,
+ HashSet::new(),
+ &configs,
+ );
+
+ // Layout v1 does not track completion time
+ assert!(!view.should_filter_uncommitted());
+ }
+
+ #[test]
+ fn test_excluding_file_groups() {
+ let instants: Vec<Instant> = vec![];
+ let configs = create_layout_v2_configs();
+ let mut excludes = HashSet::new();
+ excludes.insert(FileGroup::new("file-id-1".to_string(),
"p1".to_string()));
+ excludes.insert(FileGroup::new("file-id-2".to_string(),
"p2".to_string()));
+
+ let view = TimelineView::new(
+ "20240101120000000".to_string(),
+ None,
+ &instants,
+ excludes,
+ &configs,
+ );
+
+ assert_eq!(view.excluding_file_groups().len(), 2);
+ }
+}