This is an automated email from the ASF dual-hosted git repository.

timsaucer pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-52 by this push:
     new afc1c72a15 [branch-52] FFI_TableOptions are using default values only 
(#20705)
afc1c72a15 is described below

commit afc1c72a15bdd31e15a7e354e86a505be7882f08
Author: Tim Saucer <[email protected]>
AuthorDate: Thu Mar 5 12:22:47 2026 -0500

    [branch-52] FFI_TableOptions are using default values only (#20705)
    
    ## Which issue does this PR close?
    
    - Addresses part of https://github.com/apache/datafusion/issues/20704
    
    ## Rationale for this change
    
    FFI_TableOptions fails with a warning that is getting swallowed in the
    unit tests.
    
    ## What changes are included in this PR?
    
    Correctly check format for table options.
    
    ## Are these changes tested?
    
    Unit test now passes with patch described in topic.
    
    ## Are there any user-facing changes?
    
    None, internal only.
---
 datafusion/ffi/src/session/mod.rs | 107 +++++++++++++++++++++++++++++++++-----
 1 file changed, 95 insertions(+), 12 deletions(-)

diff --git a/datafusion/ffi/src/session/mod.rs 
b/datafusion/ffi/src/session/mod.rs
index aa910abb91..6b8664a437 100644
--- a/datafusion/ffi/src/session/mod.rs
+++ b/datafusion/ffi/src/session/mod.rs
@@ -26,7 +26,7 @@ use arrow_schema::SchemaRef;
 use arrow_schema::ffi::FFI_ArrowSchema;
 use async_ffi::{FfiFuture, FutureExt};
 use async_trait::async_trait;
-use datafusion_common::config::{ConfigOptions, TableOptions};
+use datafusion_common::config::{ConfigFileType, ConfigOptions, TableOptions};
 use datafusion_common::{DFSchema, DataFusionError};
 use datafusion_execution::TaskContext;
 use datafusion_execution::config::SessionConfig;
@@ -240,12 +240,30 @@ unsafe extern "C" fn window_functions_fn_wrapper(
         .collect()
 }
 
-fn table_options_to_rhash(options: &TableOptions) -> RHashMap<RString, 
RString> {
-    options
+fn table_options_to_rhash(mut options: TableOptions) -> RHashMap<RString, 
RString> {
+    // It is important that we mutate options here and set current format
+    // to None so that when we call `entries()` we get ALL format entries.
+    // We will pass current_format as a special case and strip it on the
+    // other side of the boundary.
+    let current_format = options.current_format.take();
+    let mut options: HashMap<RString, RString> = options
         .entries()
         .into_iter()
         .filter_map(|entry| entry.value.map(|v| (entry.key.into(), v.into())))
-        .collect()
+        .collect();
+    if let Some(current_format) = current_format {
+        options.insert(
+            "datafusion_ffi.table_current_format".into(),
+            match current_format {
+                ConfigFileType::JSON => "json",
+                ConfigFileType::PARQUET => "parquet",
+                ConfigFileType::CSV => "csv",
+            }
+            .into(),
+        );
+    }
+
+    options.into()
 }
 
 unsafe extern "C" fn table_options_fn_wrapper(
@@ -253,7 +271,7 @@ unsafe extern "C" fn table_options_fn_wrapper(
 ) -> RHashMap<RString, RString> {
     let session = session.inner();
     let table_options = session.table_options();
-    table_options_to_rhash(table_options)
+    table_options_to_rhash(table_options.clone())
 }
 
 unsafe extern "C" fn default_table_options_fn_wrapper(
@@ -262,7 +280,7 @@ unsafe extern "C" fn default_table_options_fn_wrapper(
     let session = session.inner();
     let table_options = session.default_table_options();
 
-    table_options_to_rhash(&table_options)
+    table_options_to_rhash(table_options)
 }
 
 unsafe extern "C" fn task_ctx_fn_wrapper(session: &FFI_SessionRef) -> 
FFI_TaskContext {
@@ -438,15 +456,70 @@ impl Clone for FFI_SessionRef {
 }
 
 fn table_options_from_rhashmap(options: RHashMap<RString, RString>) -> 
TableOptions {
-    let options = options
+    let mut options: HashMap<String, String> = options
         .into_iter()
         .map(|kv_pair| (kv_pair.0.into_string(), kv_pair.1.into_string()))
         .collect();
+    let current_format = options.remove("datafusion_ffi.table_current_format");
+
+    let mut table_options = TableOptions::default();
+    let formats = [
+        ConfigFileType::CSV,
+        ConfigFileType::JSON,
+        ConfigFileType::PARQUET,
+    ];
+    for format in formats {
+        // It is imperative that if new enum variants are added below that 
they be
+        // included in the formats list above and in the extension check below.
+        let format_name = match &format {
+            ConfigFileType::CSV => "csv",
+            ConfigFileType::PARQUET => "parquet",
+            ConfigFileType::JSON => "json",
+        };
+        let format_options: HashMap<String, String> = options
+            .iter()
+            .filter_map(|(k, v)| {
+                let (prefix, key) = k.split_once(".")?;
+                if prefix == format_name {
+                    Some((format!("format.{key}"), v.to_owned()))
+                } else {
+                    None
+                }
+            })
+            .collect();
+        if !format_options.is_empty() {
+            table_options.current_format = Some(format.clone());
+            table_options
+                .alter_with_string_hash_map(&format_options)
+                .unwrap_or_else(|err| log::warn!("Error parsing table options: 
{err}"));
+        }
+    }
+
+    let extension_options: HashMap<String, String> = options
+        .iter()
+        .filter_map(|(k, v)| {
+            let (prefix, _) = k.split_once(".")?;
+            if !["json", "parquet", "csv"].contains(&prefix) {
+                Some((k.to_owned(), v.to_owned()))
+            } else {
+                None
+            }
+        })
+        .collect();
+    if !extension_options.is_empty() {
+        table_options
+            .alter_with_string_hash_map(&extension_options)
+            .unwrap_or_else(|err| log::warn!("Error parsing table options: 
{err}"));
+    }
 
-    TableOptions::from_string_hash_map(&options).unwrap_or_else(|err| {
-        log::warn!("Error parsing default table options: {err}");
-        TableOptions::default()
-    })
+    table_options.current_format =
+        current_format.and_then(|format| match format.as_str() {
+            "csv" => Some(ConfigFileType::CSV),
+            "parquet" => Some(ConfigFileType::PARQUET),
+            "json" => Some(ConfigFileType::JSON),
+            _ => None,
+        });
+    table_options
 }
 
 #[async_trait]
@@ -556,6 +629,7 @@ mod tests {
     use std::sync::Arc;
 
     use arrow_schema::{DataType, Field, Schema};
+    use datafusion::execution::SessionStateBuilder;
     use datafusion_common::DataFusionError;
     use datafusion_expr::col;
     use datafusion_expr::registry::FunctionRegistry;
@@ -566,7 +640,16 @@ mod tests {
     #[tokio::test]
     async fn test_ffi_session() -> Result<(), DataFusionError> {
         let (ctx, task_ctx_provider) = 
crate::util::tests::test_session_and_ctx();
-        let state = ctx.state();
+        let mut table_options = TableOptions::default();
+        table_options.csv.has_header = Some(true);
+        table_options.json.schema_infer_max_rec = Some(10);
+        table_options.parquet.global.coerce_int96 = Some("123456789".into());
+        table_options.current_format = Some(ConfigFileType::JSON);
+
+        let state = SessionStateBuilder::new_from_existing(ctx.state())
+            .with_table_options(table_options)
+            .build();
+
         let logical_codec = FFI_LogicalExtensionCodec::new(
             Arc::new(DefaultLogicalExtensionCodec {}),
             None,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to