This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new e9722ed fix: handle non partition in MDT files partition (#505)
e9722ed is described below
commit e9722edb6d20e878ab3e0b48c3a609d75978680b
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jan 2 02:03:22 2026 -0600
fix: handle non partition in MDT files partition (#505)
---
crates/core/src/file_group/reader.rs | 26 +++---
crates/core/src/metadata/table/mod.rs | 18 +++-
crates/core/src/metadata/table/records.rs | 39 ++++++---
crates/core/src/table/partition.rs | 9 ++
crates/core/tests/table_read_tests.rs | 92 ++++++++++++++++++++-
.../mor/avro/v8_nonpartitioned.sql | 75 +++++++++++++++++
.../mor/avro/v8_nonpartitioned.zip | Bin 0 -> 378274 bytes
crates/test/src/lib.rs | 51 +++++++++++-
8 files changed, 286 insertions(+), 24 deletions(-)
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 063792e..74eaa56 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -381,22 +381,29 @@ impl FileGroupReader {
.ok_or_else(|| ReadFileSliceError("No Avro schema found in
HFile".to_string()))?
.clone();
- // Read base records: all if keys is empty, targeted lookup otherwise
- let base_records: Vec<HFileRecord> = if keys.is_empty() {
+ let hfile_keys: Vec<&str> = if keys.is_empty() {
+ vec![]
+ } else {
+ let mut sorted = keys.to_vec();
+ sorted.sort();
+ sorted
+ };
+
+ let base_records: Vec<HFileRecord> = if hfile_keys.is_empty() {
hfile_reader.collect_records().map_err(|e| {
ReadFileSliceError(format!("Failed to collect HFile records:
{:?}", e))
})?
} else {
- let lookup_results = hfile_reader.lookup_records(keys).map_err(|e|
{
- ReadFileSliceError(format!("Failed to lookup HFile records:
{:?}", e))
- })?;
- lookup_results
+ hfile_reader
+ .lookup_records(&hfile_keys)
+ .map_err(|e| {
+ ReadFileSliceError(format!("Failed to lookup HFile
records: {:?}", e))
+ })?
.into_iter()
- .filter_map(|(_, opt_record)| opt_record)
+ .filter_map(|(_, r)| r)
.collect()
};
- // Scan log files if present
let log_records = if log_file_paths.is_empty() {
vec![]
} else {
@@ -416,9 +423,8 @@ impl FileGroupReader {
}
};
- // Merge records (key filtering is applied internally when keys is
non-empty)
let merger = FilesPartitionMerger::new(schema);
- merger.merge_for_keys(&base_records, &log_records, keys)
+ merger.merge_for_keys(&base_records, &log_records, &hfile_keys)
}
}
diff --git a/crates/core/src/metadata/table/mod.rs
b/crates/core/src/metadata/table/mod.rs
index 9d81cd0..08da4ad 100644
--- a/crates/core/src/metadata/table/mod.rs
+++ b/crates/core/src/metadata/table/mod.rs
@@ -148,6 +148,10 @@ impl Table {
/// Fetch records from the `files` partition of metadata table
/// with optional data table partition pruning.
///
+ /// Records are returned with normalized partition keys. For
non-partitioned tables,
+ /// the key is "" (empty string) instead of the internal "."
representation.
+ /// Normalization happens at decode time in
[`decode_files_partition_record_with_schema`].
+ ///
/// # Note
/// Must be called on a DATA table, not a METADATA table.
pub async fn read_metadata_table_files_partition(
@@ -173,6 +177,9 @@ impl Table {
/// Fetch records from the `files` partition with optional partition
pruning.
///
+ /// For non-partitioned tables, directly fetches the "." record.
+ /// For partitioned tables with filters, performs partition pruning via
`__all_partitions__`.
+ ///
/// # Arguments
/// * `partition_pruner` - Data table's partition pruner to filter
partitions.
///
@@ -182,12 +189,19 @@ impl Table {
&self,
partition_pruner: &PartitionPruner,
) -> Result<HashMap<String, FilesPartitionRecord>> {
- // If no partition filters, read all records (pass empty keys)
+ // Non-partitioned table: directly fetch "." record
+ if !partition_pruner.is_table_partitioned() {
+ return self
+
.read_files_partition(&[FilesPartitionRecord::NON_PARTITIONED_NAME])
+ .await;
+ }
+
+ // Partitioned table without filters: read all records
if partition_pruner.is_empty() {
return self.read_files_partition(&[]).await;
}
- // Step 1: Get all partition paths
+ // Partitioned table with filters: partition pruning
let all_partitions_records = self
.read_files_partition(&[FilesPartitionRecord::ALL_PARTITIONS_KEY])
.await?;
diff --git a/crates/core/src/metadata/table/records.rs
b/crates/core/src/metadata/table/records.rs
index fd78c41..f1390f6 100644
--- a/crates/core/src/metadata/table/records.rs
+++ b/crates/core/src/metadata/table/records.rs
@@ -160,15 +160,27 @@ impl FilesPartitionRecord {
/// The key for the record that contains all partition paths.
pub const ALL_PARTITIONS_KEY: &'static str = "__all_partitions__";
+ /// The key used in metadata table for non-partitioned tables.
+ /// The metadata table stores "." for non-partitioned tables, which maps
to "" externally.
+ pub const NON_PARTITIONED_NAME: &'static str = ".";
+
/// Check if this is an ALL_PARTITIONS record.
pub fn is_all_partitions(&self) -> bool {
self.record_type == MetadataRecordType::AllPartitions
}
/// Get list of partition names (for ALL_PARTITIONS record).
+ ///
+ /// Returns the partition keys from the files map. When records are
decoded using
+ /// [`decode_files_partition_record_with_schema`], keys are normalized at
decode time,
+ /// so non-partitioned tables will have "" (empty string) as the partition
key.
+ ///
+ /// The returned list is sorted in ascending order for stable,
deterministic output.
pub fn partition_names(&self) -> Vec<&str> {
if self.is_all_partitions() {
- self.files.keys().map(|s| s.as_str()).collect()
+ let mut names: Vec<&str> = self.files.keys().map(|s|
s.as_str()).collect();
+ names.sort();
+ names
} else {
vec![]
}
@@ -243,14 +255,18 @@ pub fn decode_files_partition_record_with_schema(
record: &HFileRecord,
schema: &AvroSchema,
) -> Result<FilesPartitionRecord> {
- let key = record
+ let raw_key = record
.key_as_str()
- .ok_or_else(|| CoreError::MetadataTable("Invalid UTF-8
key".to_string()))?
- .to_string();
+ .ok_or_else(|| CoreError::MetadataTable("Invalid UTF-8
key".to_string()))?;
+ // Normalize "." -> "" for non-partitioned tables
+ let key = if raw_key == FilesPartitionRecord::NON_PARTITIONED_NAME {
+ String::new()
+ } else {
+ raw_key.to_string()
+ };
let value = record.value();
if value.is_empty() {
- // Tombstone record - treat as deleted Files record
return Ok(FilesPartitionRecord {
key,
record_type: MetadataRecordType::Files,
@@ -258,14 +274,17 @@ pub fn decode_files_partition_record_with_schema(
});
}
- // Decode using Avro
let avro_value = decode_avro_value(value, schema)?;
-
- // Extract record type
let record_type = get_record_type(&avro_value);
+ let mut files = extract_filesystem_metadata(&avro_value);
- // Extract filesystemMetadata map
- let files = extract_filesystem_metadata(&avro_value);
+ // Normalize "." -> "" in AllPartitions files map
+ if record_type == MetadataRecordType::AllPartitions {
+ if let Some(mut file_info) =
files.remove(FilesPartitionRecord::NON_PARTITIONED_NAME) {
+ file_info.name = String::new();
+ files.insert(String::new(), file_info);
+ }
+ }
Ok(FilesPartitionRecord {
key,
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index 46efacd..bcde4bd 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -55,6 +55,7 @@ pub struct PartitionPruner {
schema: Arc<Schema>,
is_hive_style: bool,
is_url_encoded: bool,
+ is_partitioned: bool,
and_filters: Vec<SchemableFilter>,
}
@@ -76,10 +77,12 @@ impl PartitionPruner {
let is_url_encoded: bool = hudi_configs
.get_or_default(HudiTableConfig::IsPartitionPathUrlencoded)
.into();
+ let is_partitioned = is_table_partitioned(hudi_configs);
Ok(PartitionPruner {
schema,
is_hive_style,
is_url_encoded,
+ is_partitioned,
and_filters,
})
}
@@ -90,6 +93,7 @@ impl PartitionPruner {
schema: Arc::new(Schema::empty()),
is_hive_style: false,
is_url_encoded: false,
+ is_partitioned: false,
and_filters: Vec::new(),
}
}
@@ -99,6 +103,11 @@ impl PartitionPruner {
self.and_filters.is_empty()
}
+ /// Returns `true` if the table is partitioned.
+ pub fn is_table_partitioned(&self) -> bool {
+ self.is_partitioned
+ }
+
/// Returns `true` if the partition path should be included based on the
filters.
pub fn should_include(&self, partition_path: &str) -> bool {
let segments = match self.parse_segments(partition_path) {
diff --git a/crates/core/tests/table_read_tests.rs
b/crates/core/tests/table_read_tests.rs
index 77ee86f..3dcc51d 100644
--- a/crates/core/tests/table_read_tests.rs
+++ b/crates/core/tests/table_read_tests.rs
@@ -26,7 +26,7 @@ use hudi_core::config::read::HudiReadConfig;
use hudi_core::config::util::empty_filters;
use hudi_core::error::Result;
use hudi_core::table::Table;
-use hudi_test::{QuickstartTripsTable, SampleTable};
+use hudi_test::{QuickstartTripsTable, SampleTable, SampleTableMdt};
/// Test helper module for v6 tables (pre-1.0 spec)
mod v6_tables {
@@ -550,3 +550,93 @@ mod v8_tables {
}
}
}
+
+/// Test module for tables with metadata table (MDT) enabled.
+/// These tests verify MDT-accelerated file listing and partition
normalization.
+mod mdt_enabled_tables {
+ use super::*;
+ use hudi_core::table::partition::PartitionPruner;
+
+ mod snapshot_queries {
+ use super::*;
+
+ /// Test reading a V8 MOR non-partitioned table with MDT enabled.
+ /// Verifies:
+ /// 1. Table can be read correctly via MDT file listing
+ /// 2. MDT partition key normalization ("." -> "") works correctly
+ /// 3. File slices are retrieved correctly from MDT
+ #[test]
+ fn test_v8_nonpartitioned_with_mdt() -> Result<()> {
+ let base_url = SampleTableMdt::V8Nonpartitioned.url_to_mor_avro();
+ let hudi_table = Table::new_blocking(base_url.path())?;
+
+ // Verify MDT is enabled
+ assert!(
+ hudi_table.is_metadata_table_enabled(),
+ "Metadata table should be enabled"
+ );
+
+ // Get file slices - this uses MDT file listing
+ let file_slices =
hudi_table.get_file_slices_blocking(empty_filters())?;
+
+ // Should have file slices for the non-partitioned table
+ assert!(
+ !file_slices.is_empty(),
+ "Should have file slices from MDT listing"
+ );
+
+ // All file slices should be in the root partition (empty string)
+ for fs in &file_slices {
+ assert_eq!(
+ &fs.partition_path, "",
+ "Non-partitioned table should have files in root partition"
+ );
+ }
+
+ Ok(())
+ }
+
+ /// Test MDT partition key normalization for non-partitioned tables.
+ /// The metadata table stores "." as partition key, but external API
should see "".
+ /// For non-partitioned tables, we use a fast path that directly
fetches "." without
+ /// going through __all_partitions__ lookup.
+ #[test]
+ fn test_v8_nonpartitioned_mdt_partition_normalization() -> Result<()> {
+ let base_url = SampleTableMdt::V8Nonpartitioned.url_to_mor_avro();
+ let hudi_table = Table::new_blocking(base_url.path())?;
+
+ // Read MDT files partition records
+ let partition_pruner = PartitionPruner::empty();
+ let records =
+
hudi_table.read_metadata_table_files_partition_blocking(&partition_pruner)?;
+
+ // For non-partitioned tables, the fast path only fetches the
files record.
+ // __all_partitions__ is not fetched to avoid redundant HFile
lookup.
+ assert_eq!(
+ records.len(),
+ 1,
+ "Non-partitioned table fast path should only fetch files
record"
+ );
+
+ // The files record should be keyed by "" (empty string)
+ // not "." (which is the internal MDT representation)
+ assert!(
+ records.contains_key(""),
+ "Non-partitioned table should have files record with empty
string key"
+ );
+ assert!(
+ !records.contains_key("."),
+ "Non-partitioned table should NOT have files record with '.'
key after normalization"
+ );
+
+ // Verify the files record has actual file entries
+ let files_record = records.get("").unwrap();
+ assert!(
+ !files_record.files.is_empty(),
+ "Files record should contain file entries"
+ );
+
+ Ok(())
+ }
+ }
+}
diff --git
a/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.sql
b/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.sql
new file mode 100644
index 0000000..28bb624
--- /dev/null
+++
b/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.sql
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- V8 MOR non-partitioned table with metadata table enabled.
+
+CREATE TABLE v8_nonpartitioned (
+ id INT,
+ name STRING,
+ isActive BOOLEAN,
+ byteField BYTE,
+ shortField SHORT,
+ intField INT,
+ longField LONG,
+ floatField FLOAT,
+ doubleField DOUBLE,
+ decimalField DECIMAL(10,5),
+ dateField DATE,
+ timestampField TIMESTAMP,
+ binaryField BINARY
+) USING HUDI
+TBLPROPERTIES (
+ type = 'mor',
+ primaryKey = 'id',
+ preCombineField = 'longField',
+ 'hoodie.metadata.enable' = 'true',
+ 'hoodie.metadata.record.index.enable' = 'true',
+ 'hoodie.parquet.small.file.limit' = '0'
+);
+
+-- Initial insert: 5 records
+INSERT INTO v8_nonpartitioned VALUES
+ (1, 'Alice', true, 1, 300, 15000, 1234567890, 1.0, 3.14159, 12345.67890,
+ CAST('2023-04-01' AS DATE), CAST('2023-04-01 12:01:00' AS TIMESTAMP),
CAST('binary data' AS BINARY)),
+ (2, 'Bob', false, 0, 100, 25000, 9876543210, 2.0, 2.71828, 67890.12345,
+ CAST('2023-04-02' AS DATE), CAST('2023-04-02 13:02:00' AS TIMESTAMP),
CAST('more binary data' AS BINARY)),
+ (3, 'Carol', true, 1, 200, 35000, 1928374650, 3.0, 1.41421, 11111.22222,
+ CAST('2023-04-03' AS DATE), CAST('2023-04-03 14:03:00' AS TIMESTAMP),
CAST('even more binary data' AS BINARY)),
+ (4, 'Diana', true, 1, 500, 45000, 987654321, 4.0, 2.468, 65432.12345,
+ CAST('2023-04-04' AS DATE), CAST('2023-04-04 15:04:00' AS TIMESTAMP),
CAST('new binary data' AS BINARY)),
+ (5, 'Eve', false, 0, 150, 55000, 1122334455, 5.0, 1.732, 99999.11111,
+ CAST('2023-04-05' AS DATE), CAST('2023-04-05 16:05:00' AS TIMESTAMP),
CAST('eve binary data' AS BINARY));
+
+-- Create secondary index on name field
+CREATE INDEX name_idx ON v8_nonpartitioned (name);
+
+-- Update record 1: change isActive and increment longField
+UPDATE v8_nonpartitioned SET isActive = false, longField = 1234567891 WHERE id
= 1;
+
+-- Delete record 2
+DELETE FROM v8_nonpartitioned WHERE id = 2;
+
+-- Update record 3: change doubleField
+UPDATE v8_nonpartitioned SET doubleField = 0.0, longField = 1928374651 WHERE
id = 3;
+
+-- Delete record 3 (after update)
+DELETE FROM v8_nonpartitioned WHERE id = 3;
+
+-- Update record 4: change name
+UPDATE v8_nonpartitioned SET name = 'Diana Updated', longField = 987654322
WHERE id = 4;
diff --git
a/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.zip
b/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.zip
new file mode 100644
index 0000000..55130ad
Binary files /dev/null and
b/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.zip
differ
diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs
index 6842c81..8d786a4 100644
--- a/crates/test/src/lib.rs
+++ b/crates/test/src/lib.rs
@@ -122,6 +122,15 @@ pub enum SampleTable {
V8TimebasedkeygenNonhivestyle,
}
+/// Sample tables with metadata table (MDT) enabled and basic schema.
+#[allow(dead_code)]
+#[derive(Debug, EnumString, AsRefStr, EnumIter)]
+#[strum(serialize_all = "snake_case")]
+pub enum SampleTableMdt {
+ /// V8 MOR non-partitioned table with MDT enabled.
+ V8Nonpartitioned,
+}
+
impl SampleTable {
/// Return rows of columns (id, name, isActive) for the given
[RecordBatch] order by id.
pub fn sample_data_order_by_id(record_batch: &RecordBatch) -> Vec<(i32,
&str, bool)> {
@@ -191,11 +200,39 @@ impl SampleTable {
}
}
+impl SampleTableMdt {
+ /// Return rows of columns (id, name, isActive) for the given
[RecordBatch] order by id.
+ pub fn sample_data_order_by_id(record_batch: &RecordBatch) -> Vec<(i32,
&str, bool)> {
+ SampleTable::sample_data_order_by_id(record_batch)
+ }
+
+ fn zip_path(&self, table_type: &str, log_format: Option<&str>) ->
Box<Path> {
+ let dir = env!("CARGO_MANIFEST_DIR");
+ let data_path = Path::new(dir)
+ .join("data/sample_table_use_mdt_basic_schema")
+ .join(table_type.to_lowercase())
+ .join(log_format.unwrap_or_default())
+ .join(format!("{}.zip", self.as_ref()));
+ data_path.into_boxed_path()
+ }
+
+ pub fn path_to_mor_avro(&self) -> String {
+ let zip_path = self.zip_path("mor", Some("avro"));
+ let path_buf =
extract_test_table(zip_path.as_ref()).join(self.as_ref());
+ path_buf.to_str().unwrap().to_string()
+ }
+
+ pub fn url_to_mor_avro(&self) -> Url {
+ let path = self.path_to_mor_avro();
+ Url::from_file_path(path).unwrap()
+ }
+}
+
#[cfg(test)]
mod tests {
use strum::IntoEnumIterator;
- use crate::{QuickstartTripsTable, SampleTable};
+ use crate::{QuickstartTripsTable, SampleTable, SampleTableMdt};
#[test]
fn quickstart_trips_table_zip_file_should_exist() {
@@ -241,4 +278,16 @@ mod tests {
}
}
}
+
+ #[test]
+ fn sample_table_mdt_zip_file_should_exist() {
+ for t in SampleTableMdt::iter() {
+ match t {
+ SampleTableMdt::V8Nonpartitioned => {
+ let path = t.zip_path("mor", Some("avro"));
+ assert!(path.exists(), "zip file should exist: {:?}",
path);
+ }
+ }
+ }
+ }
}