Copilot commented on code in PR #537:
URL: https://github.com/apache/hudi-rs/pull/537#discussion_r2970218281


##########
crates/core/src/schema/mod.rs:
##########
@@ -36,6 +38,48 @@ pub fn prepend_meta_fields_with_operation(schema: SchemaRef) 
-> Result<Schema> {
         .map_err(CoreError::ArrowError)
 }
 
+pub fn prepend_meta_fields_to_avro_schema_str(avro_schema_str: &str) -> 
Result<String> {
+    let mut schema: Value = 
serde_json::from_str(&sanitize_avro_schema_str(avro_schema_str))
+        .map_err(|e| CoreError::Schema(format!("Failed to parse Avro schema 
JSON: {e}")))?;
+
+    let fields = schema
+        .get_mut("fields")
+        .and_then(|f| f.as_array_mut())
+        .ok_or_else(|| CoreError::Schema("Avro schema has no 'fields' 
array".to_string()))?;
+
+    let meta_field_defs: Vec<Value> = MetaField::field_names()
+        .iter()
+        .map(|name| {
+            serde_json::json!({
+                "name": name,
+                "type": ["null", "string"],
+                "default": null
+            })
+        })
+        .collect();
+
+    let existing_names: std::collections::HashSet<&str> = fields
+        .iter()
+        .filter_map(|f| f.get("name").and_then(|n| n.as_str()))
+        .collect();
+
+    let new_meta_fields: Vec<Value> = meta_field_defs
+        .into_iter()
+        .filter(|f| {
+            f.get("name")
+                .and_then(|n| n.as_str())
+                .is_none_or(|name| !existing_names.contains(name))
+        })
+        .collect();
+
+    let mut all_fields = new_meta_fields;
+    all_fields.append(fields);
+    *schema.get_mut("fields").unwrap() = Value::Array(all_fields);

Review Comment:
   This function uses `schema.get_mut("fields").unwrap()` after already 
validating the presence of the `fields` array. To avoid panics in library code, 
reuse the already-validated `fields` reference or propagate an error instead of 
unwrapping.
   ```suggestion
       *fields = all_fields;
   ```



##########
crates/datafusion/src/lib.rs:
##########
@@ -269,7 +269,7 @@ impl TableProvider for HudiDataSource {
         let table = self.table.clone();
         let handle = thread::spawn(move || {
             let rt = tokio::runtime::Runtime::new().unwrap();
-            rt.block_on(async { table.get_schema().await })
+            rt.block_on(async { table.get_schema_with_meta_fields().await })
         });
         let result = handle.join().unwrap().unwrap_or_else(|_| 
Schema::empty());
         SchemaRef::from(result)

Review Comment:
   `TableProvider::schema()` contains multiple `unwrap()` calls (runtime 
creation, thread join) that can panic in library code. Please replace these 
with non-panicking error handling (e.g., return an empty schema without 
panicking and log the error, or cache a schema computed during construction) so 
query planning can’t crash the process.



##########
cpp/src/lib.rs:
##########
@@ -80,10 +81,14 @@ pub fn new_file_group_reader_with_options(
         }
     }
 
-    let reader = FileGroupReader::new_with_options(base_uri, opt_vec)
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .expect("Failed to create tokio runtime");
+    let reader = rt
+        .block_on(FileGroupReader::new_with_options(base_uri, opt_vec))
         .expect("Failed to create FileGroupReader with options");
-    let reader_wrapper = HudiFileGroupReader { inner: reader };
-    Box::new(reader_wrapper)
+    Box::new(HudiFileGroupReader { inner: reader, rt })

Review Comment:
   This C++ FFI constructor uses `expect(...)`/panic paths (runtime creation 
and `block_on` result). Panics crossing an FFI boundary can abort the process; 
prefer returning an error to C++ (e.g., via `cxx::Result` / an out-param error 
string) instead of panicking.



##########
python/src/internal.rs:
##########
@@ -86,7 +86,11 @@ impl HudiFileGroupReader {
         base_uri: &str,
         options: Option<HashMap<String, String>>,
     ) -> PyResult<Self> {
-        let inner = FileGroupReader::new_with_options(base_uri, 
options.unwrap_or_default())
+        let inner = rt()
+            .block_on(FileGroupReader::new_with_options(
+                base_uri,
+                options.unwrap_or_default(),
+            ))
             .map_err(PythonError::from)?;

Review Comment:
   `#[new] fn new_with_options` calls `rt().block_on(...)` while the Python GIL 
is held (no `py.detach()`), which can block other Python threads during option 
resolution / I/O. Consider adding a `py: Python<'_>` parameter to the 
constructor and wrapping the blocking call in `py.detach(...)`, consistent with 
the other binding methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to