This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new afc1c72a15 [branch-52] FFI_TableOptions are using default values only
(#20705)
afc1c72a15 is described below
commit afc1c72a15bdd31e15a7e354e86a505be7882f08
Author: Tim Saucer <[email protected]>
AuthorDate: Thu Mar 5 12:22:47 2026 -0500
[branch-52] FFI_TableOptions are using default values only (#20705)
## Which issue does this PR close?
- Addresses part of https://github.com/apache/datafusion/issues/20704
## Rationale for this change
FFI_TableOptions fails with a warning that is getting swallowed in the
unit tests.
## What changes are included in this PR?
Correctly check format for table options.
## Are these changes tested?
Unit test now passes with patch described in topic.
## Are there any user-facing changes?
None, internal only.
---
datafusion/ffi/src/session/mod.rs | 107 +++++++++++++++++++++++++++++++++-----
1 file changed, 95 insertions(+), 12 deletions(-)
diff --git a/datafusion/ffi/src/session/mod.rs
b/datafusion/ffi/src/session/mod.rs
index aa910abb91..6b8664a437 100644
--- a/datafusion/ffi/src/session/mod.rs
+++ b/datafusion/ffi/src/session/mod.rs
@@ -26,7 +26,7 @@ use arrow_schema::SchemaRef;
use arrow_schema::ffi::FFI_ArrowSchema;
use async_ffi::{FfiFuture, FutureExt};
use async_trait::async_trait;
-use datafusion_common::config::{ConfigOptions, TableOptions};
+use datafusion_common::config::{ConfigFileType, ConfigOptions, TableOptions};
use datafusion_common::{DFSchema, DataFusionError};
use datafusion_execution::TaskContext;
use datafusion_execution::config::SessionConfig;
@@ -240,12 +240,30 @@ unsafe extern "C" fn window_functions_fn_wrapper(
.collect()
}
-fn table_options_to_rhash(options: &TableOptions) -> RHashMap<RString,
RString> {
- options
+fn table_options_to_rhash(mut options: TableOptions) -> RHashMap<RString,
RString> {
+ // It is important that we mutate options here and set current format
+ // to None so that when we call `entries()` we get ALL format entries.
+ // We will pass current_format as a special case and strip it on the
+ // other side of the boundary.
+ let current_format = options.current_format.take();
+ let mut options: HashMap<RString, RString> = options
.entries()
.into_iter()
.filter_map(|entry| entry.value.map(|v| (entry.key.into(), v.into())))
- .collect()
+ .collect();
+ if let Some(current_format) = current_format {
+ options.insert(
+ "datafusion_ffi.table_current_format".into(),
+ match current_format {
+ ConfigFileType::JSON => "json",
+ ConfigFileType::PARQUET => "parquet",
+ ConfigFileType::CSV => "csv",
+ }
+ .into(),
+ );
+ }
+
+ options.into()
}
unsafe extern "C" fn table_options_fn_wrapper(
@@ -253,7 +271,7 @@ unsafe extern "C" fn table_options_fn_wrapper(
) -> RHashMap<RString, RString> {
let session = session.inner();
let table_options = session.table_options();
- table_options_to_rhash(table_options)
+ table_options_to_rhash(table_options.clone())
}
unsafe extern "C" fn default_table_options_fn_wrapper(
@@ -262,7 +280,7 @@ unsafe extern "C" fn default_table_options_fn_wrapper(
let session = session.inner();
let table_options = session.default_table_options();
- table_options_to_rhash(&table_options)
+ table_options_to_rhash(table_options)
}
unsafe extern "C" fn task_ctx_fn_wrapper(session: &FFI_SessionRef) ->
FFI_TaskContext {
@@ -438,15 +456,70 @@ impl Clone for FFI_SessionRef {
}
fn table_options_from_rhashmap(options: RHashMap<RString, RString>) ->
TableOptions {
- let options = options
+ let mut options: HashMap<String, String> = options
.into_iter()
.map(|kv_pair| (kv_pair.0.into_string(), kv_pair.1.into_string()))
.collect();
+ let current_format = options.remove("datafusion_ffi.table_current_format");
+
+ let mut table_options = TableOptions::default();
+ let formats = [
+ ConfigFileType::CSV,
+ ConfigFileType::JSON,
+ ConfigFileType::PARQUET,
+ ];
+ for format in formats {
+ // It is imperative that if new enum variants are added below that
they be
+ // included in the formats list above and in the extension check below.
+ let format_name = match &format {
+ ConfigFileType::CSV => "csv",
+ ConfigFileType::PARQUET => "parquet",
+ ConfigFileType::JSON => "json",
+ };
+ let format_options: HashMap<String, String> = options
+ .iter()
+ .filter_map(|(k, v)| {
+ let (prefix, key) = k.split_once(".")?;
+ if prefix == format_name {
+ Some((format!("format.{key}"), v.to_owned()))
+ } else {
+ None
+ }
+ })
+ .collect();
+ if !format_options.is_empty() {
+ table_options.current_format = Some(format.clone());
+ table_options
+ .alter_with_string_hash_map(&format_options)
+ .unwrap_or_else(|err| log::warn!("Error parsing table options:
{err}"));
+ }
+ }
+
+ let extension_options: HashMap<String, String> = options
+ .iter()
+ .filter_map(|(k, v)| {
+ let (prefix, _) = k.split_once(".")?;
+ if !["json", "parquet", "csv"].contains(&prefix) {
+ Some((k.to_owned(), v.to_owned()))
+ } else {
+ None
+ }
+ })
+ .collect();
+ if !extension_options.is_empty() {
+ table_options
+ .alter_with_string_hash_map(&extension_options)
+ .unwrap_or_else(|err| log::warn!("Error parsing table options:
{err}"));
+ }
- TableOptions::from_string_hash_map(&options).unwrap_or_else(|err| {
- log::warn!("Error parsing default table options: {err}");
- TableOptions::default()
- })
+ table_options.current_format =
+ current_format.and_then(|format| match format.as_str() {
+ "csv" => Some(ConfigFileType::CSV),
+ "parquet" => Some(ConfigFileType::PARQUET),
+ "json" => Some(ConfigFileType::JSON),
+ _ => None,
+ });
+ table_options
}
#[async_trait]
@@ -556,6 +629,7 @@ mod tests {
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema};
+ use datafusion::execution::SessionStateBuilder;
use datafusion_common::DataFusionError;
use datafusion_expr::col;
use datafusion_expr::registry::FunctionRegistry;
@@ -566,7 +640,16 @@ mod tests {
#[tokio::test]
async fn test_ffi_session() -> Result<(), DataFusionError> {
let (ctx, task_ctx_provider) =
crate::util::tests::test_session_and_ctx();
- let state = ctx.state();
+ let mut table_options = TableOptions::default();
+ table_options.csv.has_header = Some(true);
+ table_options.json.schema_infer_max_rec = Some(10);
+ table_options.parquet.global.coerce_int96 = Some("123456789".into());
+ table_options.current_format = Some(ConfigFileType::JSON);
+
+ let state = SessionStateBuilder::new_from_existing(ctx.state())
+ .with_table_options(table_options)
+ .build();
+
let logical_codec = FFI_LogicalExtensionCodec::new(
Arc::new(DefaultLogicalExtensionCodec {}),
None,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]