This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new bfa14f4f refactor(context): deduplicate register/read option-building
logic (#1479)
bfa14f4f is described below
commit bfa14f4ffa879c83acfab2f1d480d9ed474baf7d
Author: Daniel Mesejo <[email protected]>
AuthorDate: Fri Jun 5 18:09:17 2026 +0200
refactor(context): deduplicate register/read option-building logic (#1479)
* refactor(context): deduplicate register/read option-building logic
Extract shared helpers (convert_partition_cols, convert_file_sort_order,
build_parquet/json/avro_options, convert_csv_options), standardize path
types to &str, and remove redundant intermediate variables.
* refactor(context): accept PathBuf for path arguments in register/read
methods
Change path parameters from &str to PathBuf in all register/read methods
(register_listing_table, register_parquet, register_json, register_avro,
register_arrow, read_json, read_parquet, read_avro, read_arrow) so callers
can pass either a Python str or a pathlib.Path object. For register_csv and
read_csv, which take &Bound<PyAny> to handle lists, extract path elements as
PathBuf rather than String for the same reason.
Add a path_to_str helper that converts PathBuf to &str, returning an
explicit
error for non-UTF-8 paths rather than silently corrupting them.
Add build_arrow_options helper to deduplicate register_arrow/read_arrow
option-building logic, consistent with the existing parquet/json/avro
helpers.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
---------
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
crates/core/src/context.rs | 363 ++++++++++++++++++++++---------------------
python/datafusion/context.py | 28 ++--
2 files changed, 196 insertions(+), 195 deletions(-)
diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs
index ce662938..0db49625 100644
--- a/crates/core/src/context.rs
+++ b/crates/core/src/context.rs
@@ -16,7 +16,7 @@
// under the License.
use std::collections::{HashMap, HashSet};
-use std::path::PathBuf;
+use std::path::{Path, PathBuf};
use std::ptr::NonNull;
use std::str::FromStr;
use std::sync::Arc;
@@ -477,7 +477,7 @@ impl PySessionContext {
pub fn register_listing_table(
&self,
name: &str,
- path: &str,
+ path: PathBuf,
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
file_extension: &str,
schema: Option<PyArrowType<Schema>>,
@@ -486,20 +486,9 @@ impl PySessionContext {
) -> PyDataFusionResult<()> {
let options = ListingOptions::new(Arc::new(ParquetFormat::new()))
.with_file_extension(file_extension)
- .with_table_partition_cols(
- table_partition_cols
- .into_iter()
- .map(|(name, ty)| (name, ty.0))
- .collect::<Vec<(String, DataType)>>(),
- )
- .with_file_sort_order(
- file_sort_order
- .unwrap_or_default()
- .into_iter()
- .map(|e| e.into_iter().map(|f| f.into()).collect())
- .collect(),
- );
- let table_path = ListingTableUrl::parse(path)?;
+
.with_table_partition_cols(convert_partition_cols(table_partition_cols))
+ .with_file_sort_order(convert_file_sort_order(file_sort_order));
+ let table_path = ListingTableUrl::parse(path_to_str(&path)?)?;
let resolved_schema: SchemaRef = match schema {
Some(s) => Arc::new(s.0),
None => {
@@ -866,7 +855,7 @@ impl PySessionContext {
pub fn register_parquet(
&self,
name: &str,
- path: &str,
+ path: PathBuf,
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
parquet_pruning: bool,
file_extension: &str,
@@ -875,25 +864,19 @@ impl PySessionContext {
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
py: Python,
) -> PyDataFusionResult<()> {
- let mut options = ParquetReadOptions::default()
- .table_partition_cols(
- table_partition_cols
- .into_iter()
- .map(|(name, ty)| (name, ty.0))
- .collect::<Vec<(String, DataType)>>(),
- )
- .parquet_pruning(parquet_pruning)
- .skip_metadata(skip_metadata);
- options.file_extension = file_extension;
- options.schema = schema.as_ref().map(|x| &x.0);
- options.file_sort_order = file_sort_order
- .unwrap_or_default()
- .into_iter()
- .map(|e| e.into_iter().map(|f| f.into()).collect())
- .collect();
-
- let result = self.ctx.register_parquet(name, path, options);
- wait_for_future(py, result)??;
+ let options = build_parquet_options(
+ table_partition_cols,
+ parquet_pruning,
+ file_extension,
+ skip_metadata,
+ &schema,
+ file_sort_order,
+ );
+ wait_for_future(
+ py,
+ self.ctx
+ .register_parquet(name, path_to_str(&path)?, options),
+ )??;
Ok(())
}
@@ -907,19 +890,24 @@ impl PySessionContext {
options: Option<&PyCsvReadOptions>,
py: Python,
) -> PyDataFusionResult<()> {
- let options = options
- .map(|opts| opts.try_into())
- .transpose()?
- .unwrap_or_default();
+ let options = convert_csv_options(options)?;
if path.is_instance_of::<PyList>() {
- let paths = path.extract::<Vec<String>>()?;
- let result = self.register_csv_from_multiple_paths(name, paths,
options);
- wait_for_future(py, result)??;
+ let paths = path
+ .extract::<Vec<PathBuf>>()?
+ .iter()
+ .map(|p| path_to_str(p).map(str::to_owned))
+ .collect::<PyDataFusionResult<Vec<_>>>()?;
+ wait_for_future(
+ py,
+ self.register_csv_from_multiple_paths(name, paths, options),
+ )??;
} else {
- let path = path.extract::<String>()?;
- let result = self.ctx.register_csv(name, &path, options);
- wait_for_future(py, result)??;
+ let path = path.extract::<PathBuf>()?;
+ wait_for_future(
+ py,
+ self.ctx.register_csv(name, path_to_str(&path)?, options),
+ )??;
}
Ok(())
@@ -944,25 +932,17 @@ impl PySessionContext {
file_compression_type: Option<String>,
py: Python,
) -> PyDataFusionResult<()> {
- let path = path
- .to_str()
- .ok_or_else(|| PyValueError::new_err("Unable to convert path to a
string"))?;
-
- let mut options = JsonReadOptions::default()
-
.file_compression_type(parse_file_compression_type(file_compression_type)?)
- .table_partition_cols(
- table_partition_cols
- .into_iter()
- .map(|(name, ty)| (name, ty.0))
- .collect::<Vec<(String, DataType)>>(),
- );
- options.schema_infer_max_records = schema_infer_max_records;
- options.file_extension = file_extension;
- options.schema = schema.as_ref().map(|x| &x.0);
-
- let result = self.ctx.register_json(name, path, options);
- wait_for_future(py, result)??;
-
+ let options = build_json_options(
+ table_partition_cols,
+ file_compression_type,
+ schema_infer_max_records,
+ file_extension,
+ &schema,
+ )?;
+ wait_for_future(
+ py,
+ self.ctx.register_json(name, path_to_str(&path)?, options),
+ )??;
Ok(())
}
@@ -981,22 +961,11 @@ impl PySessionContext {
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
py: Python,
) -> PyDataFusionResult<()> {
- let path = path
- .to_str()
- .ok_or_else(|| PyValueError::new_err("Unable to convert path to a
string"))?;
-
- let mut options = AvroReadOptions::default().table_partition_cols(
- table_partition_cols
- .into_iter()
- .map(|(name, ty)| (name, ty.0))
- .collect::<Vec<(String, DataType)>>(),
- );
- options.file_extension = file_extension;
- options.schema = schema.as_ref().map(|x| &x.0);
-
- let result = self.ctx.register_avro(name, path, options);
- wait_for_future(py, result)??;
-
+ let options = build_avro_options(table_partition_cols, file_extension,
&schema);
+ wait_for_future(
+ py,
+ self.ctx.register_avro(name, path_to_str(&path)?, options),
+ )??;
Ok(())
}
@@ -1004,23 +973,17 @@ impl PySessionContext {
pub fn register_arrow(
&self,
name: &str,
- path: &str,
+ path: PathBuf,
schema: Option<PyArrowType<Schema>>,
file_extension: &str,
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
py: Python,
) -> PyDataFusionResult<()> {
- let mut options = ArrowReadOptions::default().table_partition_cols(
- table_partition_cols
- .into_iter()
- .map(|(name, ty)| (name, ty.0))
- .collect::<Vec<(String, DataType)>>(),
- );
- options.file_extension = file_extension;
- options.schema = schema.as_ref().map(|x| &x.0);
-
- let result = self.ctx.register_arrow(name, path, options);
- wait_for_future(py, result)??;
+ let options = build_arrow_options(table_partition_cols,
file_extension, &schema);
+ wait_for_future(
+ py,
+ self.ctx.register_arrow(name, path_to_str(&path)?, options),
+ )??;
Ok(())
}
@@ -1242,27 +1205,14 @@ impl PySessionContext {
file_compression_type: Option<String>,
py: Python,
) -> PyDataFusionResult<PyDataFrame> {
- let path = path
- .to_str()
- .ok_or_else(|| PyValueError::new_err("Unable to convert path to a
string"))?;
- let mut options = JsonReadOptions::default()
- .table_partition_cols(
- table_partition_cols
- .into_iter()
- .map(|(name, ty)| (name, ty.0))
- .collect::<Vec<(String, DataType)>>(),
- )
-
.file_compression_type(parse_file_compression_type(file_compression_type)?);
- options.schema_infer_max_records = schema_infer_max_records;
- options.file_extension = file_extension;
- let df = if let Some(schema) = schema {
- options.schema = Some(&schema.0);
- let result = self.ctx.read_json(path, options);
- wait_for_future(py, result)??
- } else {
- let result = self.ctx.read_json(path, options);
- wait_for_future(py, result)??
- };
+ let options = build_json_options(
+ table_partition_cols,
+ file_compression_type,
+ schema_infer_max_records,
+ file_extension,
+ &schema,
+ )?;
+ let df = wait_for_future(py, self.ctx.read_json(path_to_str(&path)?,
options))??;
Ok(PyDataFrame::new(df))
}
@@ -1275,23 +1225,18 @@ impl PySessionContext {
options: Option<&PyCsvReadOptions>,
py: Python,
) -> PyDataFusionResult<PyDataFrame> {
- let options = options
- .map(|opts| opts.try_into())
- .transpose()?
- .unwrap_or_default();
+ let options = convert_csv_options(options)?;
- if path.is_instance_of::<PyList>() {
- let paths = path.extract::<Vec<String>>()?;
- let paths = paths.iter().map(|p| p as &str).collect::<Vec<&str>>();
- let result = self.ctx.read_csv(paths, options);
- let df = PyDataFrame::new(wait_for_future(py, result)??);
- Ok(df)
+ let paths: Vec<String> = if path.is_instance_of::<PyList>() {
+ path.extract::<Vec<PathBuf>>()?
+ .iter()
+ .map(|p| path_to_str(p).map(str::to_owned))
+ .collect::<PyDataFusionResult<_>>()?
} else {
- let path = path.extract::<String>()?;
- let result = self.ctx.read_csv(path, options);
- let df = PyDataFrame::new(wait_for_future(py, result)??);
- Ok(df)
- }
+ vec![path_to_str(&path.extract::<PathBuf>()?)?.to_owned()]
+ };
+ let df = wait_for_future(py, self.ctx.read_csv(paths, options))??;
+ Ok(PyDataFrame::new(df))
}
#[allow(clippy::too_many_arguments)]
@@ -1305,7 +1250,7 @@ impl PySessionContext {
file_sort_order=None))]
pub fn read_parquet(
&self,
- path: &str,
+ path: PathBuf,
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
parquet_pruning: bool,
file_extension: &str,
@@ -1314,25 +1259,18 @@ impl PySessionContext {
file_sort_order: Option<Vec<Vec<PySortExpr>>>,
py: Python,
) -> PyDataFusionResult<PyDataFrame> {
- let mut options = ParquetReadOptions::default()
- .table_partition_cols(
- table_partition_cols
- .into_iter()
- .map(|(name, ty)| (name, ty.0))
- .collect::<Vec<(String, DataType)>>(),
- )
- .parquet_pruning(parquet_pruning)
- .skip_metadata(skip_metadata);
- options.file_extension = file_extension;
- options.schema = schema.as_ref().map(|x| &x.0);
- options.file_sort_order = file_sort_order
- .unwrap_or_default()
- .into_iter()
- .map(|e| e.into_iter().map(|f| f.into()).collect())
- .collect();
-
- let result = self.ctx.read_parquet(path, options);
- let df = PyDataFrame::new(wait_for_future(py, result)??);
+ let options = build_parquet_options(
+ table_partition_cols,
+ parquet_pruning,
+ file_extension,
+ skip_metadata,
+ &schema,
+ file_sort_order,
+ );
+ let df = PyDataFrame::new(wait_for_future(
+ py,
+ self.ctx.read_parquet(path_to_str(&path)?, options),
+ )??);
Ok(df)
}
@@ -1340,50 +1278,28 @@ impl PySessionContext {
#[pyo3(signature = (path, schema=None, table_partition_cols=vec![],
file_extension=".avro"))]
pub fn read_avro(
&self,
- path: &str,
+ path: PathBuf,
schema: Option<PyArrowType<Schema>>,
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
file_extension: &str,
py: Python,
) -> PyDataFusionResult<PyDataFrame> {
- let mut options = AvroReadOptions::default().table_partition_cols(
- table_partition_cols
- .into_iter()
- .map(|(name, ty)| (name, ty.0))
- .collect::<Vec<(String, DataType)>>(),
- );
- options.file_extension = file_extension;
- let df = if let Some(schema) = schema {
- options.schema = Some(&schema.0);
- let read_future = self.ctx.read_avro(path, options);
- wait_for_future(py, read_future)??
- } else {
- let read_future = self.ctx.read_avro(path, options);
- wait_for_future(py, read_future)??
- };
+ let options = build_avro_options(table_partition_cols, file_extension,
&schema);
+ let df = wait_for_future(py, self.ctx.read_avro(path_to_str(&path)?,
options))??;
Ok(PyDataFrame::new(df))
}
#[pyo3(signature = (path, schema=None, file_extension=".arrow",
table_partition_cols=vec![]))]
pub fn read_arrow(
&self,
- path: &str,
+ path: PathBuf,
schema: Option<PyArrowType<Schema>>,
file_extension: &str,
table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
py: Python,
) -> PyDataFusionResult<PyDataFrame> {
- let mut options = ArrowReadOptions::default().table_partition_cols(
- table_partition_cols
- .into_iter()
- .map(|(name, ty)| (name, ty.0))
- .collect::<Vec<(String, DataType)>>(),
- );
- options.file_extension = file_extension;
- options.schema = schema.as_ref().map(|x| &x.0);
-
- let result = self.ctx.read_arrow(path, options);
- let df = wait_for_future(py, result)??;
+ let options = build_arrow_options(table_partition_cols,
file_extension, &schema);
+ let df = wait_for_future(py, self.ctx.read_arrow(path_to_str(&path)?,
options))??;
Ok(PyDataFrame::new(df))
}
@@ -1523,7 +1439,7 @@ impl PySessionContext {
// check if the file extension matches the expected extension
for path in &table_paths {
let file_path = path.as_str();
- if !file_path.ends_with(option_extension.clone().as_str()) &&
!path.is_collection() {
+ if !file_path.ends_with(option_extension.as_str()) &&
!path.is_collection() {
return exec_err!(
"File path '{file_path}' does not match the expected
extension '{option_extension}'"
);
@@ -1594,6 +1510,97 @@ pub fn parse_file_compression_type(
})
}
+fn path_to_str(path: &Path) -> PyDataFusionResult<&str> {
+ path.to_str()
+ .ok_or_else(|| PyValueError::new_err("Unable to convert path to a
string").into())
+}
+
+fn convert_csv_options(
+ options: Option<&PyCsvReadOptions>,
+) -> PyDataFusionResult<CsvReadOptions<'_>> {
+ Ok(options
+ .map(|opts| opts.try_into())
+ .transpose()?
+ .unwrap_or_default())
+}
+
+fn convert_partition_cols(
+ table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
+) -> Vec<(String, DataType)> {
+ table_partition_cols
+ .into_iter()
+ .map(|(name, ty)| (name, ty.0))
+ .collect()
+}
+
+fn convert_file_sort_order(
+ file_sort_order: Option<Vec<Vec<PySortExpr>>>,
+) -> Vec<Vec<datafusion::logical_expr::SortExpr>> {
+ file_sort_order
+ .unwrap_or_default()
+ .into_iter()
+ .map(|e| e.into_iter().map(|f| f.into()).collect())
+ .collect()
+}
+
+fn build_parquet_options<'a>(
+ table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
+ parquet_pruning: bool,
+ file_extension: &'a str,
+ skip_metadata: bool,
+ schema: &'a Option<PyArrowType<Schema>>,
+ file_sort_order: Option<Vec<Vec<PySortExpr>>>,
+) -> ParquetReadOptions<'a> {
+ let mut options = ParquetReadOptions::default()
+ .table_partition_cols(convert_partition_cols(table_partition_cols))
+ .parquet_pruning(parquet_pruning)
+ .skip_metadata(skip_metadata);
+ options.file_extension = file_extension;
+ options.schema = schema.as_ref().map(|x| &x.0);
+ options.file_sort_order = convert_file_sort_order(file_sort_order);
+ options
+}
+
+fn build_json_options<'a>(
+ table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
+ file_compression_type: Option<String>,
+ schema_infer_max_records: usize,
+ file_extension: &'a str,
+ schema: &'a Option<PyArrowType<Schema>>,
+) -> Result<JsonReadOptions<'a>, PyErr> {
+ let mut options = JsonReadOptions::default()
+ .table_partition_cols(convert_partition_cols(table_partition_cols))
+
.file_compression_type(parse_file_compression_type(file_compression_type)?);
+ options.schema_infer_max_records = schema_infer_max_records;
+ options.file_extension = file_extension;
+ options.schema = schema.as_ref().map(|x| &x.0);
+ Ok(options)
+}
+
+fn build_arrow_options<'a>(
+ table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
+ file_extension: &'a str,
+ schema: &'a Option<PyArrowType<Schema>>,
+) -> ArrowReadOptions<'a> {
+ let mut options = ArrowReadOptions::default()
+ .table_partition_cols(convert_partition_cols(table_partition_cols));
+ options.file_extension = file_extension;
+ options.schema = schema.as_ref().map(|x| &x.0);
+ options
+}
+
+fn build_avro_options<'a>(
+ table_partition_cols: Vec<(String, PyArrowType<DataType>)>,
+ file_extension: &'a str,
+ schema: &'a Option<PyArrowType<Schema>>,
+) -> AvroReadOptions<'a> {
+ let mut options = AvroReadOptions::default()
+ .table_partition_cols(convert_partition_cols(table_partition_cols));
+ options.file_extension = file_extension;
+ options.schema = schema.as_ref().map(|x| &x.0);
+ options
+}
+
impl From<PySessionContext> for SessionContext {
fn from(ctx: PySessionContext) -> SessionContext {
ctx.ctx.as_ref().clone()
diff --git a/python/datafusion/context.py b/python/datafusion/context.py
index 3be32066..5dfeed71 100644
--- a/python/datafusion/context.py
+++ b/python/datafusion/context.py
@@ -641,7 +641,7 @@ class SessionContext:
table_partition_cols =
_convert_table_partition_cols(table_partition_cols)
self.ctx.register_listing_table(
name,
- str(path),
+ path,
table_partition_cols,
file_extension,
schema,
@@ -1055,7 +1055,7 @@ class SessionContext:
table_partition_cols =
_convert_table_partition_cols(table_partition_cols)
self.ctx.register_parquet(
name,
- str(path),
+ path,
table_partition_cols,
parquet_pruning,
file_extension,
@@ -1097,8 +1097,6 @@ class SessionContext:
options: Set advanced options for CSV reading. This cannot be
combined with any of the other options in this method.
"""
- path_arg = [str(p) for p in path] if isinstance(path, list) else
str(path)
-
if options is not None and (
schema is not None
or not has_header
@@ -1132,7 +1130,7 @@ class SessionContext:
self.ctx.register_csv(
name,
- path_arg,
+ path,
options.to_inner(),
)
@@ -1167,7 +1165,7 @@ class SessionContext:
table_partition_cols =
_convert_table_partition_cols(table_partition_cols)
self.ctx.register_json(
name,
- str(path),
+ path,
schema,
schema_infer_max_records,
file_extension,
@@ -1198,9 +1196,7 @@ class SessionContext:
if table_partition_cols is None:
table_partition_cols = []
table_partition_cols =
_convert_table_partition_cols(table_partition_cols)
- self.ctx.register_avro(
- name, str(path), schema, file_extension, table_partition_cols
- )
+ self.ctx.register_avro(name, path, schema, file_extension,
table_partition_cols)
def register_arrow(
self,
@@ -1279,7 +1275,7 @@ class SessionContext:
table_partition_cols = []
table_partition_cols =
_convert_table_partition_cols(table_partition_cols)
self.ctx.register_arrow(
- name, str(path), schema, file_extension, table_partition_cols
+ name, path, schema, file_extension, table_partition_cols
)
def register_dataset(self, name: str, dataset: pa.dataset.Dataset) -> None:
@@ -1693,7 +1689,7 @@ class SessionContext:
table_partition_cols =
_convert_table_partition_cols(table_partition_cols)
return DataFrame(
self.ctx.read_json(
- str(path),
+ path,
schema,
schema_infer_max_records,
file_extension,
@@ -1736,8 +1732,6 @@ class SessionContext:
Returns:
DataFrame representation of the read CSV files
"""
- path_arg = [str(p) for p in path] if isinstance(path, list) else
str(path)
-
if options is not None and (
schema is not None
or not has_header
@@ -1773,7 +1767,7 @@ class SessionContext:
return DataFrame(
self.ctx.read_csv(
- path_arg,
+ path,
options.to_inner(),
)
)
@@ -1816,7 +1810,7 @@ class SessionContext:
file_sort_order = self._convert_file_sort_order(file_sort_order)
return DataFrame(
self.ctx.read_parquet(
- str(path),
+ path,
table_partition_cols,
parquet_pruning,
file_extension,
@@ -1848,7 +1842,7 @@ class SessionContext:
file_partition_cols = []
file_partition_cols =
_convert_table_partition_cols(file_partition_cols)
return DataFrame(
- self.ctx.read_avro(str(path), schema, file_partition_cols,
file_extension)
+ self.ctx.read_avro(path, schema, file_partition_cols,
file_extension)
)
def read_arrow(
@@ -1920,7 +1914,7 @@ class SessionContext:
file_partition_cols = []
file_partition_cols =
_convert_table_partition_cols(file_partition_cols)
return DataFrame(
- self.ctx.read_arrow(str(path), schema, file_extension,
file_partition_cols)
+ self.ctx.read_arrow(path, schema, file_extension,
file_partition_cols)
)
def read_empty(self) -> DataFrame:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]