kylebarron commented on code in PR #825:
URL: https://github.com/apache/datafusion-python/pull/825#discussion_r1722533590
##########
python/datafusion/dataframe.py:
##########
@@ -524,3 +524,19 @@ def unnest_columns(self, *columns: str, preserve_nulls:
bool = True) -> DataFram
"""
columns = [c for c in columns]
return DataFrame(self.df.unnest_columns(columns,
preserve_nulls=preserve_nulls))
+
+ def __arrow_c_stream__(self, requested_schema: pa.Schema) -> Any:
+ """Export an Arrow PyCapsule Stream.
+
+ This will execute and collect the DataFrame. We will attempt to
respect the
+ requested schema, but only trivial transformations will be applied
such as only
+ returning the fields listed in the requested schema if their data
types match
+ those in the DataFrame.
Review Comment:
It doesn't look like any transformations are currently applied?
##########
src/context.rs:
##########
@@ -471,18 +474,35 @@ impl PySessionContext {
name: Option<&str>,
py: Python,
) -> PyResult<PyDataFrame> {
- // Instantiate pyarrow Table object & convert to batches
- let table = data.call_method0("to_batches")?;
+ let mut batches = None;
+ let mut schema = None;
- let schema = data.getattr("schema")?;
- let schema = schema.extract::<PyArrowType<Schema>>()?;
+ if let Ok(stream_reader) =
ArrowArrayStreamReader::from_pyarrow_bound(&data) {
+ // Works for any object that implements __arrow_c_stream__ in
pycapsule.
+
+ schema = Some(stream_reader.schema().as_ref().to_owned());
+ batches = Some(stream_reader.filter_map(|v| v.ok()).collect());
+ } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) {
+ // While this says RecordBatch, it will work for any object that
implements
+ // __arrow_c_array__ in pycapsule.
Review Comment:
```suggestion
// While this says RecordBatch, it will work for any object that
implements
// __arrow_c_array__ and returns a StructArray.
```
as a technical matter, other types of arrays also implement
`__arrow_c_array__`, but importing it to `RecordBatch` will fail on non-struct
types.
##########
src/context.rs:
##########
@@ -471,18 +474,35 @@ impl PySessionContext {
name: Option<&str>,
py: Python,
) -> PyResult<PyDataFrame> {
- // Instantiate pyarrow Table object & convert to batches
- let table = data.call_method0("to_batches")?;
+ let mut batches = None;
+ let mut schema = None;
- let schema = data.getattr("schema")?;
- let schema = schema.extract::<PyArrowType<Schema>>()?;
+ if let Ok(stream_reader) =
ArrowArrayStreamReader::from_pyarrow_bound(&data) {
+ // Works for any object that implements __arrow_c_stream__ in
pycapsule.
+
+ schema = Some(stream_reader.schema().as_ref().to_owned());
+ batches = Some(stream_reader.filter_map(|v| v.ok()).collect());
+ } else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) {
+ // While this says RecordBatch, it will work for any object that
implements
+ // __arrow_c_array__ in pycapsule.
+
+ schema = Some(array.schema().as_ref().to_owned());
+ batches = Some(vec![array]);
+ }
+
+ if batches.is_none() || schema.is_none() {
+ return Err(PyTypeError::new_err(
+ "Expected either a Arrow Array or Arrow Stream in
from_arrow_table().",
+ ));
+ }
+
+ let batches = batches.unwrap();
+ let schema = schema.unwrap();
Review Comment:
I'd personally opt for a different control flow that I find slightly
clearer, but just a nit:
```suggestion
let (schema, batches) = if let Ok(stream_reader) =
ArrowArrayStreamReader::from_pyarrow_bound(&data) {
// Works for any object that implements __arrow_c_stream__ in
pycapsule.
schema = Some(stream_reader.schema().as_ref().to_owned());
batches = Some(stream_reader.filter_map(|v| v.ok()).collect());
(schema, batches)
} else if let Ok(array) = RecordBatch::from_pyarrow_bound(&data) {
// While this says RecordBatch, it will work for any object that
implements
// __arrow_c_array__ in pycapsule.
schema = Some(array.schema().as_ref().to_owned());
batches = Some(vec![array]);
(schema, batches)
} else {
return Err(PyTypeError::new_err(
"Expected either a Arrow Array or Arrow Stream in
from_arrow_table().",
));
}
```
##########
src/context.rs:
##########
@@ -471,18 +474,35 @@ impl PySessionContext {
name: Option<&str>,
py: Python,
) -> PyResult<PyDataFrame> {
- // Instantiate pyarrow Table object & convert to batches
- let table = data.call_method0("to_batches")?;
+ let mut batches = None;
+ let mut schema = None;
- let schema = data.getattr("schema")?;
- let schema = schema.extract::<PyArrowType<Schema>>()?;
+ if let Ok(stream_reader) =
ArrowArrayStreamReader::from_pyarrow_bound(&data) {
+ // Works for any object that implements __arrow_c_stream__ in
pycapsule.
+
+ schema = Some(stream_reader.schema().as_ref().to_owned());
+ batches = Some(stream_reader.filter_map(|v| v.ok()).collect());
Review Comment:
I don't think we should ignore errors here. I'd suggest returning an error
if one of these batches errored.
##########
src/dataframe.rs:
##########
@@ -451,6 +454,26 @@ impl PyDataFrame {
Ok(table)
}
+ #[allow(unused_variables)]
+ fn __arrow_c_stream__<'py>(
+ &'py mut self,
+ py: Python<'py>,
+ requested_schema: Option<Bound<'py, PyCapsule>>,
+ ) -> PyResult<Bound<'py, PyCapsule>> {
+ let batches = wait_for_future(py, self.df.as_ref().clone().collect())?
+ .into_iter()
+ .map(|r| Ok(r));
Review Comment:
It would be cool if we didn't have to collect all the batches ahead of time,
but could synchronously collect one batch at a time within the iterator. I have
no idea if datafusion would support that though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]