This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new e9722ed  fix: handle non partition in MDT files partition (#505)
e9722ed is described below

commit e9722edb6d20e878ab3e0b48c3a609d75978680b
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri Jan 2 02:03:22 2026 -0600

    fix: handle non partition in MDT files partition (#505)
---
 crates/core/src/file_group/reader.rs               |  26 +++---
 crates/core/src/metadata/table/mod.rs              |  18 +++-
 crates/core/src/metadata/table/records.rs          |  39 ++++++---
 crates/core/src/table/partition.rs                 |   9 ++
 crates/core/tests/table_read_tests.rs              |  92 ++++++++++++++++++++-
 .../mor/avro/v8_nonpartitioned.sql                 |  75 +++++++++++++++++
 .../mor/avro/v8_nonpartitioned.zip                 | Bin 0 -> 378274 bytes
 crates/test/src/lib.rs                             |  51 +++++++++++-
 8 files changed, 286 insertions(+), 24 deletions(-)

diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 063792e..74eaa56 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -381,22 +381,29 @@ impl FileGroupReader {
             .ok_or_else(|| ReadFileSliceError("No Avro schema found in 
HFile".to_string()))?
             .clone();
 
-        // Read base records: all if keys is empty, targeted lookup otherwise
-        let base_records: Vec<HFileRecord> = if keys.is_empty() {
+        let hfile_keys: Vec<&str> = if keys.is_empty() {
+            vec![]
+        } else {
+            let mut sorted = keys.to_vec();
+            sorted.sort();
+            sorted
+        };
+
+        let base_records: Vec<HFileRecord> = if hfile_keys.is_empty() {
             hfile_reader.collect_records().map_err(|e| {
                 ReadFileSliceError(format!("Failed to collect HFile records: 
{:?}", e))
             })?
         } else {
-            let lookup_results = hfile_reader.lookup_records(keys).map_err(|e| 
{
-                ReadFileSliceError(format!("Failed to lookup HFile records: 
{:?}", e))
-            })?;
-            lookup_results
+            hfile_reader
+                .lookup_records(&hfile_keys)
+                .map_err(|e| {
+                    ReadFileSliceError(format!("Failed to lookup HFile 
records: {:?}", e))
+                })?
                 .into_iter()
-                .filter_map(|(_, opt_record)| opt_record)
+                .filter_map(|(_, r)| r)
                 .collect()
         };
 
-        // Scan log files if present
         let log_records = if log_file_paths.is_empty() {
             vec![]
         } else {
@@ -416,9 +423,8 @@ impl FileGroupReader {
             }
         };
 
-        // Merge records (key filtering is applied internally when keys is 
non-empty)
         let merger = FilesPartitionMerger::new(schema);
-        merger.merge_for_keys(&base_records, &log_records, keys)
+        merger.merge_for_keys(&base_records, &log_records, &hfile_keys)
     }
 }
 
diff --git a/crates/core/src/metadata/table/mod.rs 
b/crates/core/src/metadata/table/mod.rs
index 9d81cd0..08da4ad 100644
--- a/crates/core/src/metadata/table/mod.rs
+++ b/crates/core/src/metadata/table/mod.rs
@@ -148,6 +148,10 @@ impl Table {
     /// Fetch records from the `files` partition of metadata table
     /// with optional data table partition pruning.
     ///
+    /// Records are returned with normalized partition keys. For 
non-partitioned tables,
+    /// the key is "" (empty string) instead of the internal "." 
representation.
+    /// Normalization happens at decode time in 
[`decode_files_partition_record_with_schema`].
+    ///
     /// # Note
     /// Must be called on a DATA table, not a METADATA table.
     pub async fn read_metadata_table_files_partition(
@@ -173,6 +177,9 @@ impl Table {
 
     /// Fetch records from the `files` partition with optional partition 
pruning.
     ///
+    /// For non-partitioned tables, directly fetches the "." record.
+    /// For partitioned tables with filters, performs partition pruning via 
`__all_partitions__`.
+    ///
     /// # Arguments
     /// * `partition_pruner` - Data table's partition pruner to filter 
partitions.
     ///
@@ -182,12 +189,19 @@ impl Table {
         &self,
         partition_pruner: &PartitionPruner,
     ) -> Result<HashMap<String, FilesPartitionRecord>> {
-        // If no partition filters, read all records (pass empty keys)
+        // Non-partitioned table: directly fetch "." record
+        if !partition_pruner.is_table_partitioned() {
+            return self
+                
.read_files_partition(&[FilesPartitionRecord::NON_PARTITIONED_NAME])
+                .await;
+        }
+
+        // Partitioned table without filters: read all records
         if partition_pruner.is_empty() {
             return self.read_files_partition(&[]).await;
         }
 
-        // Step 1: Get all partition paths
+        // Partitioned table with filters: partition pruning
         let all_partitions_records = self
             .read_files_partition(&[FilesPartitionRecord::ALL_PARTITIONS_KEY])
             .await?;
diff --git a/crates/core/src/metadata/table/records.rs 
b/crates/core/src/metadata/table/records.rs
index fd78c41..f1390f6 100644
--- a/crates/core/src/metadata/table/records.rs
+++ b/crates/core/src/metadata/table/records.rs
@@ -160,15 +160,27 @@ impl FilesPartitionRecord {
     /// The key for the record that contains all partition paths.
     pub const ALL_PARTITIONS_KEY: &'static str = "__all_partitions__";
 
+    /// The key used in metadata table for non-partitioned tables.
+    /// The metadata table stores "." for non-partitioned tables, which maps 
to "" externally.
+    pub const NON_PARTITIONED_NAME: &'static str = ".";
+
     /// Check if this is an ALL_PARTITIONS record.
     pub fn is_all_partitions(&self) -> bool {
         self.record_type == MetadataRecordType::AllPartitions
     }
 
     /// Get list of partition names (for ALL_PARTITIONS record).
+    ///
+    /// Returns the partition keys from the files map. When records are 
decoded using
+    /// [`decode_files_partition_record_with_schema`], keys are normalized at 
decode time,
+    /// so non-partitioned tables will have "" (empty string) as the partition 
key.
+    ///
+    /// The returned list is sorted in ascending order for stable, 
deterministic output.
     pub fn partition_names(&self) -> Vec<&str> {
         if self.is_all_partitions() {
-            self.files.keys().map(|s| s.as_str()).collect()
+            let mut names: Vec<&str> = self.files.keys().map(|s| 
s.as_str()).collect();
+            names.sort();
+            names
         } else {
             vec![]
         }
@@ -243,14 +255,18 @@ pub fn decode_files_partition_record_with_schema(
     record: &HFileRecord,
     schema: &AvroSchema,
 ) -> Result<FilesPartitionRecord> {
-    let key = record
+    let raw_key = record
         .key_as_str()
-        .ok_or_else(|| CoreError::MetadataTable("Invalid UTF-8 
key".to_string()))?
-        .to_string();
+        .ok_or_else(|| CoreError::MetadataTable("Invalid UTF-8 
key".to_string()))?;
+    // Normalize "." -> "" for non-partitioned tables
+    let key = if raw_key == FilesPartitionRecord::NON_PARTITIONED_NAME {
+        String::new()
+    } else {
+        raw_key.to_string()
+    };
 
     let value = record.value();
     if value.is_empty() {
-        // Tombstone record - treat as deleted Files record
         return Ok(FilesPartitionRecord {
             key,
             record_type: MetadataRecordType::Files,
@@ -258,14 +274,17 @@ pub fn decode_files_partition_record_with_schema(
         });
     }
 
-    // Decode using Avro
     let avro_value = decode_avro_value(value, schema)?;
-
-    // Extract record type
     let record_type = get_record_type(&avro_value);
+    let mut files = extract_filesystem_metadata(&avro_value);
 
-    // Extract filesystemMetadata map
-    let files = extract_filesystem_metadata(&avro_value);
+    // Normalize "." -> "" in AllPartitions files map
+    if record_type == MetadataRecordType::AllPartitions {
+        if let Some(mut file_info) = 
files.remove(FilesPartitionRecord::NON_PARTITIONED_NAME) {
+            file_info.name = String::new();
+            files.insert(String::new(), file_info);
+        }
+    }
 
     Ok(FilesPartitionRecord {
         key,
diff --git a/crates/core/src/table/partition.rs 
b/crates/core/src/table/partition.rs
index 46efacd..bcde4bd 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -55,6 +55,7 @@ pub struct PartitionPruner {
     schema: Arc<Schema>,
     is_hive_style: bool,
     is_url_encoded: bool,
+    is_partitioned: bool,
     and_filters: Vec<SchemableFilter>,
 }
 
@@ -76,10 +77,12 @@ impl PartitionPruner {
         let is_url_encoded: bool = hudi_configs
             .get_or_default(HudiTableConfig::IsPartitionPathUrlencoded)
             .into();
+        let is_partitioned = is_table_partitioned(hudi_configs);
         Ok(PartitionPruner {
             schema,
             is_hive_style,
             is_url_encoded,
+            is_partitioned,
             and_filters,
         })
     }
@@ -90,6 +93,7 @@ impl PartitionPruner {
             schema: Arc::new(Schema::empty()),
             is_hive_style: false,
             is_url_encoded: false,
+            is_partitioned: false,
             and_filters: Vec::new(),
         }
     }
@@ -99,6 +103,11 @@ impl PartitionPruner {
         self.and_filters.is_empty()
     }
 
+    /// Returns `true` if the table is partitioned.
+    pub fn is_table_partitioned(&self) -> bool {
+        self.is_partitioned
+    }
+
     /// Returns `true` if the partition path should be included based on the 
filters.
     pub fn should_include(&self, partition_path: &str) -> bool {
         let segments = match self.parse_segments(partition_path) {
diff --git a/crates/core/tests/table_read_tests.rs 
b/crates/core/tests/table_read_tests.rs
index 77ee86f..3dcc51d 100644
--- a/crates/core/tests/table_read_tests.rs
+++ b/crates/core/tests/table_read_tests.rs
@@ -26,7 +26,7 @@ use hudi_core::config::read::HudiReadConfig;
 use hudi_core::config::util::empty_filters;
 use hudi_core::error::Result;
 use hudi_core::table::Table;
-use hudi_test::{QuickstartTripsTable, SampleTable};
+use hudi_test::{QuickstartTripsTable, SampleTable, SampleTableMdt};
 
 /// Test helper module for v6 tables (pre-1.0 spec)
 mod v6_tables {
@@ -550,3 +550,93 @@ mod v8_tables {
         }
     }
 }
+
+/// Test module for tables with metadata table (MDT) enabled.
+/// These tests verify MDT-accelerated file listing and partition 
normalization.
+mod mdt_enabled_tables {
+    use super::*;
+    use hudi_core::table::partition::PartitionPruner;
+
+    mod snapshot_queries {
+        use super::*;
+
+        /// Test reading a V8 MOR non-partitioned table with MDT enabled.
+        /// Verifies:
+        /// 1. Table can be read correctly via MDT file listing
+        /// 2. MDT partition key normalization ("." -> "") works correctly
+        /// 3. File slices are retrieved correctly from MDT
+        #[test]
+        fn test_v8_nonpartitioned_with_mdt() -> Result<()> {
+            let base_url = SampleTableMdt::V8Nonpartitioned.url_to_mor_avro();
+            let hudi_table = Table::new_blocking(base_url.path())?;
+
+            // Verify MDT is enabled
+            assert!(
+                hudi_table.is_metadata_table_enabled(),
+                "Metadata table should be enabled"
+            );
+
+            // Get file slices - this uses MDT file listing
+            let file_slices = 
hudi_table.get_file_slices_blocking(empty_filters())?;
+
+            // Should have file slices for the non-partitioned table
+            assert!(
+                !file_slices.is_empty(),
+                "Should have file slices from MDT listing"
+            );
+
+            // All file slices should be in the root partition (empty string)
+            for fs in &file_slices {
+                assert_eq!(
+                    &fs.partition_path, "",
+                    "Non-partitioned table should have files in root partition"
+                );
+            }
+
+            Ok(())
+        }
+
+        /// Test MDT partition key normalization for non-partitioned tables.
+        /// The metadata table stores "." as partition key, but external API 
should see "".
+        /// For non-partitioned tables, we use a fast path that directly 
fetches "." without
+        /// going through __all_partitions__ lookup.
+        #[test]
+        fn test_v8_nonpartitioned_mdt_partition_normalization() -> Result<()> {
+            let base_url = SampleTableMdt::V8Nonpartitioned.url_to_mor_avro();
+            let hudi_table = Table::new_blocking(base_url.path())?;
+
+            // Read MDT files partition records
+            let partition_pruner = PartitionPruner::empty();
+            let records =
+                
hudi_table.read_metadata_table_files_partition_blocking(&partition_pruner)?;
+
+            // For non-partitioned tables, the fast path only fetches the 
files record.
+            // __all_partitions__ is not fetched to avoid redundant HFile 
lookup.
+            assert_eq!(
+                records.len(),
+                1,
+                "Non-partitioned table fast path should only fetch files 
record"
+            );
+
+            // The files record should be keyed by "" (empty string)
+            // not "." (which is the internal MDT representation)
+            assert!(
+                records.contains_key(""),
+                "Non-partitioned table should have files record with empty 
string key"
+            );
+            assert!(
+                !records.contains_key("."),
+                "Non-partitioned table should NOT have files record with '.' 
key after normalization"
+            );
+
+            // Verify the files record has actual file entries
+            let files_record = records.get("").unwrap();
+            assert!(
+                !files_record.files.is_empty(),
+                "Files record should contain file entries"
+            );
+
+            Ok(())
+        }
+    }
+}
diff --git 
a/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.sql
 
b/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.sql
new file mode 100644
index 0000000..28bb624
--- /dev/null
+++ 
b/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.sql
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+-- V8 MOR non-partitioned table with metadata table enabled.
+
+CREATE TABLE v8_nonpartitioned (
+    id INT,
+    name STRING,
+    isActive BOOLEAN,
+    byteField BYTE,
+    shortField SHORT,
+    intField INT,
+    longField LONG,
+    floatField FLOAT,
+    doubleField DOUBLE,
+    decimalField DECIMAL(10,5),
+    dateField DATE,
+    timestampField TIMESTAMP,
+    binaryField BINARY
+) USING HUDI
+TBLPROPERTIES (
+    type = 'mor',
+    primaryKey = 'id',
+    preCombineField = 'longField',
+    'hoodie.metadata.enable' = 'true',
+    'hoodie.metadata.record.index.enable' = 'true',
+    'hoodie.parquet.small.file.limit' = '0'
+);
+
+-- Initial insert: 5 records
+INSERT INTO v8_nonpartitioned VALUES
+    (1, 'Alice', true, 1, 300, 15000, 1234567890, 1.0, 3.14159, 12345.67890,
+     CAST('2023-04-01' AS DATE), CAST('2023-04-01 12:01:00' AS TIMESTAMP), 
CAST('binary data' AS BINARY)),
+    (2, 'Bob', false, 0, 100, 25000, 9876543210, 2.0, 2.71828, 67890.12345,
+     CAST('2023-04-02' AS DATE), CAST('2023-04-02 13:02:00' AS TIMESTAMP), 
CAST('more binary data' AS BINARY)),
+    (3, 'Carol', true, 1, 200, 35000, 1928374650, 3.0, 1.41421, 11111.22222,
+     CAST('2023-04-03' AS DATE), CAST('2023-04-03 14:03:00' AS TIMESTAMP), 
CAST('even more binary data' AS BINARY)),
+    (4, 'Diana', true, 1, 500, 45000, 987654321, 4.0, 2.468, 65432.12345,
+     CAST('2023-04-04' AS DATE), CAST('2023-04-04 15:04:00' AS TIMESTAMP), 
CAST('new binary data' AS BINARY)),
+    (5, 'Eve', false, 0, 150, 55000, 1122334455, 5.0, 1.732, 99999.11111,
+     CAST('2023-04-05' AS DATE), CAST('2023-04-05 16:05:00' AS TIMESTAMP), 
CAST('eve binary data' AS BINARY));
+
+-- Create secondary index on name field
+CREATE INDEX name_idx ON v8_nonpartitioned (name);
+
+-- Update record 1: change isActive and increment longField
+UPDATE v8_nonpartitioned SET isActive = false, longField = 1234567891 WHERE id 
= 1;
+
+-- Delete record 2
+DELETE FROM v8_nonpartitioned WHERE id = 2;
+
+-- Update record 3: change doubleField
+UPDATE v8_nonpartitioned SET doubleField = 0.0, longField = 1928374651 WHERE 
id = 3;
+
+-- Delete record 3 (after update)
+DELETE FROM v8_nonpartitioned WHERE id = 3;
+
+-- Update record 4: change name
+UPDATE v8_nonpartitioned SET name = 'Diana Updated', longField = 987654322 
WHERE id = 4;
diff --git 
a/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.zip
 
b/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.zip
new file mode 100644
index 0000000..55130ad
Binary files /dev/null and 
b/crates/test/data/sample_table_use_mdt_basic_schema/mor/avro/v8_nonpartitioned.zip
 differ
diff --git a/crates/test/src/lib.rs b/crates/test/src/lib.rs
index 6842c81..8d786a4 100644
--- a/crates/test/src/lib.rs
+++ b/crates/test/src/lib.rs
@@ -122,6 +122,15 @@ pub enum SampleTable {
     V8TimebasedkeygenNonhivestyle,
 }
 
+/// Sample tables with metadata table (MDT) enabled and basic schema.
+#[allow(dead_code)]
+#[derive(Debug, EnumString, AsRefStr, EnumIter)]
+#[strum(serialize_all = "snake_case")]
+pub enum SampleTableMdt {
+    /// V8 MOR non-partitioned table with MDT enabled.
+    V8Nonpartitioned,
+}
+
 impl SampleTable {
     /// Return rows of columns (id, name, isActive) for the given 
[RecordBatch] order by id.
     pub fn sample_data_order_by_id(record_batch: &RecordBatch) -> Vec<(i32, 
&str, bool)> {
@@ -191,11 +200,39 @@ impl SampleTable {
     }
 }
 
+impl SampleTableMdt {
+    /// Return rows of columns (id, name, isActive) for the given 
[RecordBatch] order by id.
+    pub fn sample_data_order_by_id(record_batch: &RecordBatch) -> Vec<(i32, 
&str, bool)> {
+        SampleTable::sample_data_order_by_id(record_batch)
+    }
+
+    fn zip_path(&self, table_type: &str, log_format: Option<&str>) -> 
Box<Path> {
+        let dir = env!("CARGO_MANIFEST_DIR");
+        let data_path = Path::new(dir)
+            .join("data/sample_table_use_mdt_basic_schema")
+            .join(table_type.to_lowercase())
+            .join(log_format.unwrap_or_default())
+            .join(format!("{}.zip", self.as_ref()));
+        data_path.into_boxed_path()
+    }
+
+    pub fn path_to_mor_avro(&self) -> String {
+        let zip_path = self.zip_path("mor", Some("avro"));
+        let path_buf = 
extract_test_table(zip_path.as_ref()).join(self.as_ref());
+        path_buf.to_str().unwrap().to_string()
+    }
+
+    pub fn url_to_mor_avro(&self) -> Url {
+        let path = self.path_to_mor_avro();
+        Url::from_file_path(path).unwrap()
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use strum::IntoEnumIterator;
 
-    use crate::{QuickstartTripsTable, SampleTable};
+    use crate::{QuickstartTripsTable, SampleTable, SampleTableMdt};
 
     #[test]
     fn quickstart_trips_table_zip_file_should_exist() {
@@ -241,4 +278,16 @@ mod tests {
             }
         }
     }
+
+    #[test]
+    fn sample_table_mdt_zip_file_should_exist() {
+        for t in SampleTableMdt::iter() {
+            match t {
+                SampleTableMdt::V8Nonpartitioned => {
+                    let path = t.zip_path("mor", Some("avro"));
+                    assert!(path.exists(), "zip file should exist: {:?}", 
path);
+                }
+            }
+        }
+    }
 }

Reply via email to