This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new f1b3029d Add function collect_column to dataframe (#1302)
f1b3029d is described below
commit f1b3029db6443c20e6f4c5d2ed0cc7b4217ce256
Author: Tim Saucer <[email protected]>
AuthorDate: Thu Nov 13 11:33:30 2025 -0500
Add function collect_column to dataframe (#1302)
---
Cargo.lock | 1 +
Cargo.toml | 1 +
docs/source/user-guide/dataframe/index.rst | 7 +++++--
python/datafusion/dataframe.py | 4 ++++
python/tests/test_dataframe.py | 12 ++++++++++++
src/dataframe.rs | 26 +++++++++++++++++++++++++-
6 files changed, 48 insertions(+), 3 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 2e345e71..a291189f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1596,6 +1596,7 @@ name = "datafusion-python"
version = "50.1.0"
dependencies = [
"arrow",
+ "arrow-select",
"async-trait",
"cstr",
"datafusion",
diff --git a/Cargo.toml b/Cargo.toml
index 3b7a4caa..1e8c3366 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -56,6 +56,7 @@ pyo3 = { version = "0.25", features = [
pyo3-async-runtimes = { version = "0.25", features = ["tokio-runtime"] }
pyo3-log = "0.12.4"
arrow = { version = "56", features = ["pyarrow"] }
+arrow-select = { version = "56" }
datafusion = { version = "50", features = ["avro", "unicode_expressions"] }
datafusion-substrait = { version = "50", optional = true }
datafusion-proto = { version = "50" }
diff --git a/docs/source/user-guide/dataframe/index.rst
b/docs/source/user-guide/dataframe/index.rst
index 659589cf..510bcbc6 100644
--- a/docs/source/user-guide/dataframe/index.rst
+++ b/docs/source/user-guide/dataframe/index.rst
@@ -200,6 +200,9 @@ To materialize the results of your DataFrame operations:
# Count rows
count = df.count()
+ # Collect a single column of data as a PyArrow Array
+ arr = df.collect_column("age")
+
Zero-copy streaming to Arrow-based Python libraries
---------------------------------------------------
@@ -238,7 +241,7 @@ PyArrow:
Each batch exposes ``to_pyarrow()``, allowing conversion to a PyArrow
table. ``pa.table(df)`` collects the entire DataFrame eagerly into a
-PyArrow table::
+PyArrow table:
.. code-block:: python
@@ -246,7 +249,7 @@ PyArrow table::
table = pa.table(df)
Asynchronous iteration is supported as well, allowing integration with
-``asyncio`` event loops::
+``asyncio`` event loops:
.. code-block:: python
diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py
index b3b48e96..12f1145a 100644
--- a/python/datafusion/dataframe.py
+++ b/python/datafusion/dataframe.py
@@ -728,6 +728,10 @@ class DataFrame:
"""
return self.df.collect()
+ def collect_column(self, column_name: str) -> pa.Array | pa.ChunkedArray:
+ """Executes this :py:class:`DataFrame` for a single column."""
+ return self.df.collect_column(column_name)
+
def cache(self) -> DataFrame:
"""Cache the DataFrame as a memory table.
diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py
index 8292e258..48615916 100644
--- a/python/tests/test_dataframe.py
+++ b/python/tests/test_dataframe.py
@@ -1745,6 +1745,18 @@ def test_collect_partitioned():
assert [[batch]] == ctx.create_dataframe([[batch]]).collect_partitioned()
+def test_collect_column(ctx: SessionContext):
+ batch_1 = pa.RecordBatch.from_pydict({"a": [1, 2, 3]})
+ batch_2 = pa.RecordBatch.from_pydict({"a": [4, 5, 6]})
+ batch_3 = pa.RecordBatch.from_pydict({"a": [7, 8, 9]})
+
+ ctx.register_record_batches("t", [[batch_1, batch_2], [batch_3]])
+
+ result = ctx.table("t").sort(column("a")).collect_column("a")
+ expected = pa.array([1, 2, 3, 4, 5, 6, 7, 8, 9])
+ assert result == expected
+
+
def test_union(ctx):
batch = pa.RecordBatch.from_arrays(
[pa.array([1, 2, 3]), pa.array([4, 5, 6])],
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 187bb0ac..9071d580 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -20,7 +20,7 @@ use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::sync::Arc;
-use arrow::array::{new_null_array, RecordBatch, RecordBatchReader};
+use arrow::array::{new_null_array, Array, ArrayRef, RecordBatch,
RecordBatchReader};
use arrow::compute::can_cast_types;
use arrow::error::ArrowError;
use arrow::ffi::FFI_ArrowSchema;
@@ -343,6 +343,23 @@ impl PyDataFrame {
Ok(html_str)
}
+
+ async fn collect_column_inner(&self, column: &str) -> Result<ArrayRef,
DataFusionError> {
+ let batches = self
+ .df
+ .as_ref()
+ .clone()
+ .select_columns(&[column])?
+ .collect()
+ .await?;
+
+ let arrays = batches
+ .iter()
+ .map(|b| b.column(0).as_ref())
+ .collect::<Vec<_>>();
+
+ arrow_select::concat::concat(&arrays).map_err(Into::into)
+ }
}
/// Synchronous wrapper around partitioned [`SendableRecordBatchStream`]s used
@@ -610,6 +627,13 @@ impl PyDataFrame {
.collect()
}
+ fn collect_column(&self, py: Python, column: &str) -> PyResult<PyObject> {
+ wait_for_future(py, self.collect_column_inner(column))?
+ .map_err(PyDataFusionError::from)?
+ .to_data()
+ .to_pyarrow(py)
+ }
+
/// Print the result, 20 lines by default
#[pyo3(signature = (num=20))]
fn show(&self, py: Python, num: usize) -> PyDataFusionResult<()> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]