This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 441decb  fix: avoid dashmap deadlock for correlated queries (#539)
441decb is described below

commit 441decb3ae36b73b6cdcd276de4cce21f0fa8bdf
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Mar 9 10:30:38 2026 -0500

    fix: avoid dashmap deadlock for correlated queries (#539)
---
 crates/core/src/schema/delete.rs | 31 ++++++++++++-------------------
 crates/core/src/table/fs_view.rs | 18 +++++++++++++-----
 2 files changed, 25 insertions(+), 24 deletions(-)

diff --git a/crates/core/src/schema/delete.rs b/crates/core/src/schema/delete.rs
index 7f5efd1..351e9dc 100644
--- a/crates/core/src/schema/delete.rs
+++ b/crates/core/src/schema/delete.rs
@@ -25,20 +25,15 @@ use arrow_array::{RecordBatch, StringArray};
 use arrow_schema::{DataType, Field, Schema, SchemaRef};
 use once_cell::sync::Lazy;
 use serde_json::Value as JsonValue;
-use std::fs;
-use std::fs::File;
-use std::path::PathBuf;
 use std::sync::Arc;
 
-static DELETE_RECORD_AVRO_SCHEMA_IN_JSON: Lazy<Result<JsonValue>> = 
Lazy::new(|| {
-    let schema_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
-        .join("schemas")
-        .join("HoodieDeleteRecord.avsc");
-
-    let content = fs::read_to_string(schema_path)
-        .map_err(|e| CoreError::Schema(format!("Failed to read schema file: 
{e}")))?;
+static DELETE_RECORD_AVRO_SCHEMA_STR: &str = include_str!(concat!(
+    env!("CARGO_MANIFEST_DIR"),
+    "/schemas/HoodieDeleteRecord.avsc"
+));
 
-    serde_json::from_str(&content)
+static DELETE_RECORD_AVRO_SCHEMA_IN_JSON: Lazy<Result<JsonValue>> = 
Lazy::new(|| {
+    serde_json::from_str(DELETE_RECORD_AVRO_SCHEMA_STR)
         .map_err(|e| CoreError::Schema(format!("Failed to parse schema to 
JSON: {e}")))
 });
 
@@ -110,15 +105,13 @@ pub fn avro_schema_for_delete_record(delete_record_value: 
&AvroValue) -> Result<
     AvroSchema::parse(&json).map_err(CoreError::AvroError)
 }
 
-static DELETE_RECORD_LIST_AVRO_SCHEMA: Lazy<Result<AvroSchema>> = Lazy::new(|| 
{
-    let schema_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
-        .join("schemas")
-        .join("HoodieDeleteRecordList.avsc");
-
-    let mut file = File::open(&schema_path)
-        .map_err(|e| CoreError::Schema(format!("Failed to open schema file: 
{e}")))?;
+static DELETE_RECORD_LIST_AVRO_SCHEMA_STR: &str = include_str!(concat!(
+    env!("CARGO_MANIFEST_DIR"),
+    "/schemas/HoodieDeleteRecordList.avsc"
+));
 
-    AvroSchema::parse_reader(&mut file).map_err(CoreError::AvroError)
+static DELETE_RECORD_LIST_AVRO_SCHEMA: Lazy<Result<AvroSchema>> = Lazy::new(|| 
{
+    
AvroSchema::parse_str(DELETE_RECORD_LIST_AVRO_SCHEMA_STR).map_err(CoreError::AvroError)
 });
 
 pub fn avro_schema_for_delete_record_list() -> Result<&'static AvroSchema> {
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 223a86f..a3740c1 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -210,6 +210,10 @@ impl FileSystemView {
     }
 
     /// Collect file slices from loaded file groups using the timeline view.
+    ///
+    /// File slices are first collected from the DashMap using read locks 
(released
+    /// promptly), then metadata is loaded on the owned clones without holding 
any
+    /// locks.
     async fn collect_file_slices(
         &self,
         partition_pruner: &PartitionPruner,
@@ -219,21 +223,25 @@ impl FileSystemView {
         let excluding_file_groups = timeline_view.excluding_file_groups();
 
         let mut file_slices = Vec::new();
-        for mut partition_entry in self.partition_to_file_groups.iter_mut() {
+        for partition_entry in self.partition_to_file_groups.iter() {
             if !partition_pruner.should_include(partition_entry.key()) {
                 continue;
             }
-            let file_groups = partition_entry.value_mut();
-            for fg in file_groups.iter_mut() {
+            let file_groups = partition_entry.value();
+            for fg in file_groups.iter() {
                 if excluding_file_groups.contains(fg) {
                     continue;
                 }
-                if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
-                    fsl.load_metadata_if_needed(&self.storage).await?;
+                if let Some(fsl) = fg.get_file_slice_as_of(timestamp) {
                     file_slices.push(fsl.clone());
                 }
             }
         }
+
+        for fsl in &mut file_slices {
+            fsl.load_metadata_if_needed(&self.storage).await?;
+        }
+
         Ok(file_slices)
     }
 

Reply via email to