This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new f5fdf596 feat: expose DataFrame.write_table (#1264)
f5fdf596 is described below
commit f5fdf59663b075cb1dbaed5b87cca3b0092e5f3c
Author: Tim Saucer <[email protected]>
AuthorDate: Mon Oct 13 08:28:28 2025 -0400
feat: expose DataFrame.write_table (#1264)
* Initial commit for dataframe write_table
* Add dataframe writer options and docstring
* add csv write unit test
* add docstrings
* more testing around writer options
* Minor docstring change
Co-authored-by: Copilot <[email protected]>
* Format docstring so it renders better
* whitespace
* Resolve error on insert operation and add unit test coverage
* mark classes as frozen
---------
Co-authored-by: Copilot <[email protected]>
---
python/datafusion/__init__.py | 10 ++-
python/datafusion/dataframe.py | 135 +++++++++++++++++++++++++++++++++++-----
python/tests/test_dataframe.py | 89 +++++++++++++++++++++++++-
src/dataframe.rs | 138 ++++++++++++++++++++++++++++++++++++-----
src/lib.rs | 2 +
5 files changed, 339 insertions(+), 35 deletions(-)
diff --git a/python/datafusion/__init__.py b/python/datafusion/__init__.py
index 9ebd58ea..77765223 100644
--- a/python/datafusion/__init__.py
+++ b/python/datafusion/__init__.py
@@ -44,7 +44,13 @@ from .context import (
SessionContext,
SQLOptions,
)
-from .dataframe import DataFrame, ParquetColumnOptions, ParquetWriterOptions
+from .dataframe import (
+ DataFrame,
+ DataFrameWriteOptions,
+ InsertOp,
+ ParquetColumnOptions,
+ ParquetWriterOptions,
+)
from .dataframe_formatter import configure_formatter
from .expr import Expr, WindowFrame
from .io import read_avro, read_csv, read_json, read_parquet
@@ -71,9 +77,11 @@ __all__ = [
"Config",
"DFSchema",
"DataFrame",
+ "DataFrameWriteOptions",
"Database",
"ExecutionPlan",
"Expr",
+ "InsertOp",
"LogicalPlan",
"ParquetColumnOptions",
"ParquetWriterOptions",
diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py
index 5a21d773..16765656 100644
--- a/python/datafusion/dataframe.py
+++ b/python/datafusion/dataframe.py
@@ -39,10 +39,13 @@ except ImportError:
from typing_extensions import deprecated # Python 3.12
from datafusion._internal import DataFrame as DataFrameInternal
+from datafusion._internal import DataFrameWriteOptions as
DataFrameWriteOptionsInternal
+from datafusion._internal import InsertOp as InsertOpInternal
from datafusion._internal import ParquetColumnOptions as
ParquetColumnOptionsInternal
from datafusion._internal import ParquetWriterOptions as
ParquetWriterOptionsInternal
from datafusion.expr import (
Expr,
+ SortExpr,
SortKey,
ensure_expr,
ensure_expr_list,
@@ -939,14 +942,23 @@ class DataFrame:
"""
return DataFrame(self.df.except_all(other.df))
- def write_csv(self, path: str | pathlib.Path, with_header: bool = False)
-> None:
+ def write_csv(
+ self,
+ path: str | pathlib.Path,
+ with_header: bool = False,
+ write_options: DataFrameWriteOptions | None = None,
+ ) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a CSV
file.
Args:
path: Path of the CSV file to write.
with_header: If true, output the CSV header row.
+ write_options: Options that impact how the DataFrame is written.
"""
- self.df.write_csv(str(path), with_header)
+ raw_write_options = (
+ write_options._raw_write_options if write_options is not None else
None
+ )
+ self.df.write_csv(str(path), with_header, raw_write_options)
@overload
def write_parquet(
@@ -954,6 +966,7 @@ class DataFrame:
path: str | pathlib.Path,
compression: str,
compression_level: int | None = None,
+ write_options: DataFrameWriteOptions | None = None,
) -> None: ...
@overload
@@ -962,6 +975,7 @@ class DataFrame:
path: str | pathlib.Path,
compression: Compression = Compression.ZSTD,
compression_level: int | None = None,
+ write_options: DataFrameWriteOptions | None = None,
) -> None: ...
@overload
@@ -970,6 +984,7 @@ class DataFrame:
path: str | pathlib.Path,
compression: ParquetWriterOptions,
compression_level: None = None,
+ write_options: DataFrameWriteOptions | None = None,
) -> None: ...
def write_parquet(
@@ -977,24 +992,30 @@ class DataFrame:
path: str | pathlib.Path,
compression: Union[str, Compression, ParquetWriterOptions] =
Compression.ZSTD,
compression_level: int | None = None,
+ write_options: DataFrameWriteOptions | None = None,
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a
Parquet file.
+ Available compression types are:
+
+ - "uncompressed": No compression.
+ - "snappy": Snappy compression.
+ - "gzip": Gzip compression.
+ - "brotli": Brotli compression.
+ - "lz4": LZ4 compression.
+ - "lz4_raw": LZ4_RAW compression.
+ - "zstd": Zstandard compression.
+
+ LZO compression is not yet implemented in arrow-rs and is therefore
+ excluded.
+
Args:
path: Path of the Parquet file to write.
compression: Compression type to use. Default is "ZSTD".
- Available compression types are:
- - "uncompressed": No compression.
- - "snappy": Snappy compression.
- - "gzip": Gzip compression.
- - "brotli": Brotli compression.
- - "lz4": LZ4 compression.
- - "lz4_raw": LZ4_RAW compression.
- - "zstd": Zstandard compression.
- Note: LZO is not yet implemented in arrow-rs and is therefore
excluded.
compression_level: Compression level to use. For ZSTD, the
recommended range is 1 to 22, with the default being 4. Higher
levels
provide better compression but slower speed.
+ write_options: Options that impact how the DataFrame is written.
"""
if isinstance(compression, ParquetWriterOptions):
if compression_level is not None:
@@ -1012,10 +1033,21 @@ class DataFrame:
):
compression_level = compression.get_default_level()
- self.df.write_parquet(str(path), compression.value, compression_level)
+ raw_write_options = (
+ write_options._raw_write_options if write_options is not None else
None
+ )
+ self.df.write_parquet(
+ str(path),
+ compression.value,
+ compression_level,
+ raw_write_options,
+ )
def write_parquet_with_options(
- self, path: str | pathlib.Path, options: ParquetWriterOptions
+ self,
+ path: str | pathlib.Path,
+ options: ParquetWriterOptions,
+ write_options: DataFrameWriteOptions | None = None,
) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a
Parquet file.
@@ -1024,6 +1056,7 @@ class DataFrame:
Args:
path: Path of the Parquet file to write.
options: Sets the writer parquet options (see
`ParquetWriterOptions`).
+ write_options: Options that impact how the DataFrame is written.
"""
options_internal = ParquetWriterOptionsInternal(
options.data_pagesize_limit,
@@ -1060,19 +1093,45 @@ class DataFrame:
bloom_filter_ndv=opts.bloom_filter_ndv,
)
+ raw_write_options = (
+ write_options._raw_write_options if write_options is not None else
None
+ )
self.df.write_parquet_with_options(
str(path),
options_internal,
column_specific_options_internal,
+ raw_write_options,
)
- def write_json(self, path: str | pathlib.Path) -> None:
+ def write_json(
+ self,
+ path: str | pathlib.Path,
+ write_options: DataFrameWriteOptions | None = None,
+ ) -> None:
"""Execute the :py:class:`DataFrame` and write the results to a JSON
file.
Args:
path: Path of the JSON file to write.
+ write_options: Options that impact how the DataFrame is written.
+ """
+ raw_write_options = (
+ write_options._raw_write_options if write_options is not None else
None
+ )
+ self.df.write_json(str(path), write_options=raw_write_options)
+
+ def write_table(
+ self, table_name: str, write_options: DataFrameWriteOptions | None =
None
+ ) -> None:
+ """Execute the :py:class:`DataFrame` and write the results to a table.
+
+ The table must be registered with the session to perform this
operation.
+ Not all table providers support writing operations. See the individual
+ implementations for details.
"""
- self.df.write_json(str(path))
+ raw_write_options = (
+ write_options._raw_write_options if write_options is not None else
None
+ )
+ self.df.write_table(table_name, raw_write_options)
def to_arrow_table(self) -> pa.Table:
"""Execute the :py:class:`DataFrame` and convert it into an Arrow
Table.
@@ -1220,3 +1279,49 @@ class DataFrame:
- For columns not in subset, the original column is kept unchanged
"""
return DataFrame(self.df.fill_null(value, subset))
+
+
+class InsertOp(Enum):
+ """Insert operation mode.
+
+ These modes are used by the table writing feature to define how record
+ batches should be written to a table.
+ """
+
+ APPEND = InsertOpInternal.APPEND
+ """Appends new rows to the existing table without modifying any existing
rows."""
+
+ REPLACE = InsertOpInternal.REPLACE
+ """Replace existing rows that collide with the inserted rows.
+
+ Replacement is typically based on a unique key or primary key.
+ """
+
+ OVERWRITE = InsertOpInternal.OVERWRITE
+ """Overwrites all existing rows in the table with the new rows."""
+
+
+class DataFrameWriteOptions:
+ """Writer options for DataFrame.
+
+ There is no guarantee the table provider supports all writer options.
+ See the individual implementation and documentation for details.
+ """
+
+ def __init__(
+ self,
+ insert_operation: InsertOp | None = None,
+ single_file_output: bool = False,
+ partition_by: str | Sequence[str] | None = None,
+ sort_by: Expr | SortExpr | Sequence[Expr] | Sequence[SortExpr] | None
= None,
+ ) -> None:
+ """Instantiate writer options for DataFrame."""
+ if isinstance(partition_by, str):
+ partition_by = [partition_by]
+
+ sort_by_raw = sort_list_to_raw_sort_list(sort_by)
+ insert_op = insert_operation.value if insert_operation is not None
else None
+
+ self._raw_write_options = DataFrameWriteOptionsInternal(
+ insert_op, single_file_output, partition_by, sort_by_raw
+ )
diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py
index eb686dd1..cd85221c 100644
--- a/python/tests/test_dataframe.py
+++ b/python/tests/test_dataframe.py
@@ -16,6 +16,7 @@
# under the License.
import ctypes
import datetime
+import itertools
import os
import re
import threading
@@ -27,6 +28,7 @@ import pyarrow.parquet as pq
import pytest
from datafusion import (
DataFrame,
+ InsertOp,
ParquetColumnOptions,
ParquetWriterOptions,
SessionContext,
@@ -40,6 +42,7 @@ from datafusion import (
from datafusion import (
functions as f,
)
+from datafusion.dataframe import DataFrameWriteOptions
from datafusion.dataframe_formatter import (
DataFrameHtmlFormatter,
configure_formatter,
@@ -58,9 +61,7 @@ def ctx():
@pytest.fixture
-def df():
- ctx = SessionContext()
-
+def df(ctx):
# create a RecordBatch and a new DataFrame from it
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6]), pa.array([8, 5, 8])],
@@ -1830,6 +1831,69 @@ def test_write_csv(ctx, df, tmp_path, path_to_str):
assert result == expected
+def generate_test_write_params() -> list[tuple]:
+ # Overwrite and Replace are not implemented for many table writers
+ insert_ops = [InsertOp.APPEND, None]
+ sort_by_cases = [
+ (None, [1, 2, 3], "unsorted"),
+ (column("c"), [2, 1, 3], "single_column_expr"),
+ (column("a").sort(ascending=False), [3, 2, 1], "single_sort_expr"),
+ ([column("c"), column("b")], [2, 1, 3], "list_col_expr"),
+ (
+ [column("c").sort(ascending=False),
column("b").sort(ascending=False)],
+ [3, 1, 2],
+ "list_sort_expr",
+ ),
+ ]
+
+ formats = ["csv", "json", "parquet", "table"]
+
+ return [
+ pytest.param(
+ output_format,
+ insert_op,
+ sort_by,
+ expected_a,
+ id=f"{output_format}_{test_id}",
+ )
+ for output_format, insert_op, (
+ sort_by,
+ expected_a,
+ test_id,
+ ) in itertools.product(formats, insert_ops, sort_by_cases)
+ ]
+
+
[email protected](
+ ("output_format", "insert_op", "sort_by", "expected_a"),
+ generate_test_write_params(),
+)
+def test_write_files_with_options(
+ ctx, df, tmp_path, output_format, insert_op, sort_by, expected_a
+) -> None:
+ write_options = DataFrameWriteOptions(insert_operation=insert_op,
sort_by=sort_by)
+
+ if output_format == "csv":
+ df.write_csv(tmp_path, with_header=True, write_options=write_options)
+ ctx.register_csv("test_table", tmp_path)
+ elif output_format == "json":
+ df.write_json(tmp_path, write_options=write_options)
+ ctx.register_json("test_table", tmp_path)
+ elif output_format == "parquet":
+ df.write_parquet(tmp_path, write_options=write_options)
+ ctx.register_parquet("test_table", tmp_path)
+ elif output_format == "table":
+ batch = pa.RecordBatch.from_arrays([[], [], []], schema=df.schema())
+ ctx.register_record_batches("test_table", [[batch]])
+ ctx.table("test_table").show()
+ df.write_table("test_table", write_options=write_options)
+
+ result = ctx.table("test_table").to_pydict()["a"]
+ ctx.table("test_table").show()
+
+ assert result == expected_a
+
+
@pytest.mark.parametrize("path_to_str", [True, False])
def test_write_json(ctx, df, tmp_path, path_to_str):
path = str(tmp_path) if path_to_str else tmp_path
@@ -2322,6 +2386,25 @@ def test_write_parquet_options_error(df, tmp_path):
df.write_parquet(str(tmp_path), options, compression_level=1)
+def test_write_table(ctx, df):
+ batch = pa.RecordBatch.from_arrays(
+ [pa.array([1, 2, 3])],
+ names=["a"],
+ )
+
+ ctx.register_record_batches("t", [[batch]])
+
+ df = ctx.table("t").with_column("a", column("a") * literal(-1))
+
+ ctx.table("t").show()
+
+ df.write_table("t")
+ result = ctx.table("t").sort(column("a")).collect()[0][0].to_pylist()
+ expected = [-3, -2, -1, 1, 2, 3]
+
+ assert result == expected
+
+
def test_dataframe_export(df) -> None:
# Guarantees that we have the canonical implementation
# reading our dataframe export
diff --git a/src/dataframe.rs b/src/dataframe.rs
index bfdc35e1..c23c0c97 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -33,6 +33,8 @@ use datafusion::config::{CsvOptions, ParquetColumnOptions,
ParquetOptions, Table
use datafusion::dataframe::{DataFrame, DataFrameWriteOptions};
use datafusion::error::DataFusionError;
use datafusion::execution::SendableRecordBatchStream;
+use datafusion::logical_expr::dml::InsertOp;
+use datafusion::logical_expr::SortExpr;
use datafusion::parquet::basic::{BrotliLevel, Compression, GzipLevel,
ZstdLevel};
use datafusion::prelude::*;
use futures::{StreamExt, TryStreamExt};
@@ -723,18 +725,27 @@ impl PyDataFrame {
}
/// Write a `DataFrame` to a CSV file.
- fn write_csv(&self, path: &str, with_header: bool, py: Python) ->
PyDataFusionResult<()> {
+ fn write_csv(
+ &self,
+ py: Python,
+ path: &str,
+ with_header: bool,
+ write_options: Option<PyDataFrameWriteOptions>,
+ ) -> PyDataFusionResult<()> {
let csv_options = CsvOptions {
has_header: Some(with_header),
..Default::default()
};
+ let write_options = write_options
+ .map(DataFrameWriteOptions::from)
+ .unwrap_or_default();
+
wait_for_future(
py,
- self.df.as_ref().clone().write_csv(
- path,
- DataFrameWriteOptions::new(),
- Some(csv_options),
- ),
+ self.df
+ .as_ref()
+ .clone()
+ .write_csv(path, write_options, Some(csv_options)),
)??;
Ok(())
}
@@ -743,13 +754,15 @@ impl PyDataFrame {
#[pyo3(signature = (
path,
compression="zstd",
- compression_level=None
+ compression_level=None,
+ write_options=None,
))]
fn write_parquet(
&self,
path: &str,
compression: &str,
compression_level: Option<u32>,
+ write_options: Option<PyDataFrameWriteOptions>,
py: Python,
) -> PyDataFusionResult<()> {
fn verify_compression_level(cl: Option<u32>) -> Result<u32, PyErr> {
@@ -788,14 +801,16 @@ impl PyDataFrame {
let mut options = TableParquetOptions::default();
options.global.compression = Some(compression_string);
+ let write_options = write_options
+ .map(DataFrameWriteOptions::from)
+ .unwrap_or_default();
wait_for_future(
py,
- self.df.as_ref().clone().write_parquet(
- path,
- DataFrameWriteOptions::new(),
- Option::from(options),
- ),
+ self.df
+ .as_ref()
+ .clone()
+ .write_parquet(path, write_options, Option::from(options)),
)??;
Ok(())
}
@@ -806,6 +821,7 @@ impl PyDataFrame {
path: &str,
options: PyParquetWriterOptions,
column_specific_options: HashMap<String, PyParquetColumnOptions>,
+ write_options: Option<PyDataFrameWriteOptions>,
py: Python,
) -> PyDataFusionResult<()> {
let table_options = TableParquetOptions {
@@ -816,12 +832,14 @@ impl PyDataFrame {
.collect(),
..Default::default()
};
-
+ let write_options = write_options
+ .map(DataFrameWriteOptions::from)
+ .unwrap_or_default();
wait_for_future(
py,
self.df.as_ref().clone().write_parquet(
path,
- DataFrameWriteOptions::new(),
+ write_options,
Option::from(table_options),
),
)??;
@@ -829,13 +847,40 @@ impl PyDataFrame {
}
/// Executes a query and writes the results to a partitioned JSON file.
- fn write_json(&self, path: &str, py: Python) -> PyDataFusionResult<()> {
+ fn write_json(
+ &self,
+ path: &str,
+ py: Python,
+ write_options: Option<PyDataFrameWriteOptions>,
+ ) -> PyDataFusionResult<()> {
+ let write_options = write_options
+ .map(DataFrameWriteOptions::from)
+ .unwrap_or_default();
+ wait_for_future(
+ py,
+ self.df
+ .as_ref()
+ .clone()
+ .write_json(path, write_options, None),
+ )??;
+ Ok(())
+ }
+
+ fn write_table(
+ &self,
+ py: Python,
+ table_name: &str,
+ write_options: Option<PyDataFrameWriteOptions>,
+ ) -> PyDataFusionResult<()> {
+ let write_options = write_options
+ .map(DataFrameWriteOptions::from)
+ .unwrap_or_default();
wait_for_future(
py,
self.df
.as_ref()
.clone()
- .write_json(path, DataFrameWriteOptions::new(), None),
+ .write_table(table_name, write_options),
)??;
Ok(())
}
@@ -974,6 +1019,67 @@ impl PyDataFrame {
}
}
+#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
+#[pyclass(frozen, eq, eq_int, name = "InsertOp", module = "datafusion")]
+pub enum PyInsertOp {
+ APPEND,
+ REPLACE,
+ OVERWRITE,
+}
+
+impl From<PyInsertOp> for InsertOp {
+ fn from(value: PyInsertOp) -> Self {
+ match value {
+ PyInsertOp::APPEND => InsertOp::Append,
+ PyInsertOp::REPLACE => InsertOp::Replace,
+ PyInsertOp::OVERWRITE => InsertOp::Overwrite,
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+#[pyclass(frozen, name = "DataFrameWriteOptions", module = "datafusion")]
+pub struct PyDataFrameWriteOptions {
+ insert_operation: InsertOp,
+ single_file_output: bool,
+ partition_by: Vec<String>,
+ sort_by: Vec<SortExpr>,
+}
+
+impl From<PyDataFrameWriteOptions> for DataFrameWriteOptions {
+ fn from(value: PyDataFrameWriteOptions) -> Self {
+ DataFrameWriteOptions::new()
+ .with_insert_operation(value.insert_operation)
+ .with_single_file_output(value.single_file_output)
+ .with_partition_by(value.partition_by)
+ .with_sort_by(value.sort_by)
+ }
+}
+
+#[pymethods]
+impl PyDataFrameWriteOptions {
+ #[new]
+ fn new(
+ insert_operation: Option<PyInsertOp>,
+ single_file_output: bool,
+ partition_by: Option<Vec<String>>,
+ sort_by: Option<Vec<PySortExpr>>,
+ ) -> Self {
+ let insert_operation =
insert_operation.map(Into::into).unwrap_or(InsertOp::Append);
+ let sort_by = sort_by
+ .unwrap_or_default()
+ .into_iter()
+ .map(Into::into)
+ .collect();
+ Self {
+ insert_operation,
+ single_file_output,
+ partition_by: partition_by.unwrap_or_default(),
+ sort_by,
+ }
+ }
+}
+
/// Print DataFrame
fn print_dataframe(py: Python, df: DataFrame) -> PyDataFusionResult<()> {
// Get string representation of record batches
diff --git a/src/lib.rs b/src/lib.rs
index 0361c731..4f816d88 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -87,6 +87,8 @@ fn _internal(py: Python, m: Bound<'_, PyModule>) ->
PyResult<()> {
m.add_class::<context::PySessionContext>()?;
m.add_class::<context::PySQLOptions>()?;
m.add_class::<dataframe::PyDataFrame>()?;
+ m.add_class::<dataframe::PyInsertOp>()?;
+ m.add_class::<dataframe::PyDataFrameWriteOptions>()?;
m.add_class::<dataframe::PyParquetColumnOptions>()?;
m.add_class::<dataframe::PyParquetWriterOptions>()?;
m.add_class::<udf::PyScalarUDF>()?;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]