Repository: arrow Updated Branches: refs/heads/master ebe7dc8f5 -> b5de9e56d
ARROW-369: [Python] Convert multiple record batches at once to Pandas Modified Pandas adapter to handle columns with multiple chunks with `ConvertColumnToPandas`. This modifies the pyarrow public API by adding a class `RecordBatchList` and static method `toPandas` which takes a list of Arrow RecordBatches and outputs a Pandas DataFrame. Adds unit test in test_table.py to do the conversion for each column with typed specialization. Author: Bryan Cutler <cutl...@gmail.com> Closes #216 from BryanCutler/multi-batch-toPandas-ARROW-369 and squashes the following commits: b6c9986 [Bryan Cutler] fixed formatting edf056e [Bryan Cutler] simplified with pyarrow.schema.Schema.equals 068bc1b [Bryan Cutler] Merge remote-tracking branch 'upstream/master' into multi-batch-toPandas-ARROW-369 da65345 [Bryan Cutler] fixed test case for schema checking 9edb0ba [Bryan Cutler] used auto keyword where some typecasting was done in ConvertValues bd2a720 [Bryan Cutler] added testcase for schema not equal, disabled now c3d7e8f [Bryan Cutler] Changed conversion to make Table from columns first, now conversion is now just a free function 3ee51e6 [Bryan Cutler] cleanup 398b18d [Bryan Cutler] Fixed case for Integer specialization without nulls 7b29a55 [Bryan Cutler] Initial working version of RecordBatch list to_pandas, need more tests and cleanup Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b5de9e56 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b5de9e56 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b5de9e56 Branch: refs/heads/master Commit: b5de9e56db08480050445dd883643448af12b81b Parents: ebe7dc8 Author: Bryan Cutler <cutl...@gmail.com> Authored: Fri Dec 2 14:34:47 2016 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Fri Dec 2 14:34:47 2016 -0500 ---------------------------------------------------------------------- python/pyarrow/__init__.py | 4 +- python/pyarrow/includes/libarrow.pxd | 3 + python/pyarrow/table.pyx | 47 +++++++ python/pyarrow/tests/test_table.py | 35 +++++ python/src/pyarrow/adapters/pandas.cc | 209 ++++++++++++++++++----------- 5 files changed, 219 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 775ce7e..d4d0f00 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -41,5 +41,7 @@ from pyarrow.schema import (null, bool_, list_, struct, field, DataType, Field, Schema, schema) -from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe +from pyarrow.table import (Column, RecordBatch, dataframe_from_batches, Table, + from_pandas_dataframe) + from pyarrow.version import version as __version__ http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 19da408..350ebe3 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -158,6 +158,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CColumn(const shared_ptr[CField]& field, const shared_ptr[CArray]& data) + CColumn(const shared_ptr[CField]& field, + const vector[shared_ptr[CArray]]& chunks) + int64_t length() int64_t null_count() const c_string& name() http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/table.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index a6715b1..45cf7be 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -28,6 +28,7 @@ cimport pyarrow.includes.pyarrow as pyarrow import pyarrow.config from pyarrow.array cimport Array, box_arrow_array +from pyarrow.error import ArrowException from pyarrow.error cimport check_status from pyarrow.schema cimport box_data_type, box_schema @@ -414,6 +415,52 @@ cdef class RecordBatch: return result +def dataframe_from_batches(batches): + """ + Convert a list of Arrow RecordBatches to a pandas.DataFrame + + Parameters + ---------- + + batches: list of RecordBatch + RecordBatch list to be converted, schemas must be equal + """ + + cdef: + vector[shared_ptr[CArray]] c_array_chunks + vector[shared_ptr[CColumn]] c_columns + shared_ptr[CTable] c_table + Array arr + Schema schema + + import pandas as pd + + schema = batches[0].schema + + # check schemas are equal + if any((not schema.equals(other.schema) for other in batches[1:])): + raise ArrowException("Error converting list of RecordBatches to " + "DataFrame, not all schemas are equal") + + cdef int K = batches[0].num_columns + + # create chunked columns from the batches + c_columns.resize(K) + for i in range(K): + for batch in batches: + arr = batch[i] + c_array_chunks.push_back(arr.sp_array) + c_columns[i].reset(new CColumn(schema.sp_schema.get().field(i), + c_array_chunks)) + c_array_chunks.clear() + + # create a Table from columns and convert to DataFrame + c_table.reset(new CTable('', schema.sp_schema, c_columns)) + table = Table() + table.init(c_table) + return table.to_pandas() + + cdef class Table: """ A collection of top-level named, equal length Arrow arrays. http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/pyarrow/tests/test_table.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 4c9d302..dc4f37a 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -19,6 +19,7 @@ import numpy as np from pandas.util.testing import assert_frame_equal import pandas as pd +import pytest import pyarrow as pa @@ -50,6 +51,40 @@ def test_recordbatch_from_to_pandas(): assert_frame_equal(data, result) +def test_recordbatchlist_to_pandas(): + data1 = pd.DataFrame({ + 'c1': np.array([1, 1, 2], dtype='uint32'), + 'c2': np.array([1.0, 2.0, 3.0], dtype='float64'), + 'c3': [True, None, False], + 'c4': ['foo', 'bar', None] + }) + + data2 = pd.DataFrame({ + 'c1': np.array([3, 5], dtype='uint32'), + 'c2': np.array([4.0, 5.0], dtype='float64'), + 'c3': [True, True], + 'c4': ['baz', 'qux'] + }) + + batch1 = pa.RecordBatch.from_pandas(data1) + batch2 = pa.RecordBatch.from_pandas(data2) + + result = pa.dataframe_from_batches([batch1, batch2]) + data = pd.concat([data1, data2], ignore_index=True) + assert_frame_equal(data, result) + + +def test_recordbatchlist_schema_equals(): + data1 = pd.DataFrame({'c1': np.array([1], dtype='uint32')}) + data2 = pd.DataFrame({'c1': np.array([4.0, 5.0], dtype='float64')}) + + batch1 = pa.RecordBatch.from_pandas(data1) + batch2 = pa.RecordBatch.from_pandas(data2) + + with pytest.raises(pa.ArrowException): + pa.dataframe_from_batches([batch1, batch2]) + + def test_table_basics(): data = [ pa.from_pylist(range(5)), http://git-wip-us.apache.org/repos/asf/arrow/blob/b5de9e56/python/src/pyarrow/adapters/pandas.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 1f5b700..adb27e8 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -597,14 +597,10 @@ class ArrowDeserializer { Status Convert(PyObject** out) { const std::shared_ptr<arrow::ChunkedArray> data = col_->data(); - if (data->num_chunks() > 1) { - return Status::NotImplemented("Chunked column conversion NYI"); - } - - auto chunk = data->chunk(0); - RETURN_NOT_OK(ConvertValues<TYPE>(chunk)); + RETURN_NOT_OK(ConvertValues<TYPE>(data)); *out = reinterpret_cast<PyObject*>(out_); + return Status::OK(); } @@ -654,27 +650,48 @@ class ArrowDeserializer { } template <int T2> + Status ConvertValuesZeroCopy(std::shared_ptr<Array> arr) { + typedef typename arrow_traits<T2>::T T; + + auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + + // Zero-Copy. We can pass the data pointer directly to NumPy. + void* data = const_cast<T*>(in_values); + int type = arrow_traits<TYPE>::npy_type; + RETURN_NOT_OK(OutputFromData(type, data)); + + return Status::OK(); + } + + template <int T2> inline typename std::enable_if< arrow_traits<T2>::is_pandas_numeric_nullable, Status>::type - ConvertValues(const std::shared_ptr<Array>& arr) { + ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) { typedef typename arrow_traits<T2>::T T; + size_t chunk_offset = 0; - arrow::PrimitiveArray* prim_arr = static_cast<arrow::PrimitiveArray*>( - arr.get()); - const T* in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + if (data->num_chunks() == 1 && data->null_count() == 0) { + return ConvertValuesZeroCopy<TYPE>(data->chunk(0)); + } + + RETURN_NOT_OK(AllocateOutput(arrow_traits<T2>::npy_type)); - if (arr->null_count() > 0) { - RETURN_NOT_OK(AllocateOutput(arrow_traits<T2>::npy_type)); + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr<Array> arr = data->chunk(c); + auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_)) + chunk_offset; - T* out_values = reinterpret_cast<T*>(PyArray_DATA(out_)); - for (int64_t i = 0; i < arr->length(); ++i) { - out_values[i] = arr->IsNull(i) ? arrow_traits<T2>::na_value : in_values[i]; + if (arr->null_count() > 0) { + for (int64_t i = 0; i < arr->length(); ++i) { + out_values[i] = arr->IsNull(i) ? arrow_traits<T2>::na_value : in_values[i]; + } + } else { + memcpy(out_values, in_values, sizeof(T) * arr->length()); } - } else { - // Zero-Copy. We can pass the data pointer directly to NumPy. - void* data = const_cast<T*>(in_values); - int type = arrow_traits<TYPE>::npy_type; - RETURN_NOT_OK(OutputFromData(type, data)); + + chunk_offset += arr->length(); } return Status::OK(); @@ -684,27 +701,43 @@ class ArrowDeserializer { template <int T2> inline typename std::enable_if< arrow_traits<T2>::is_pandas_numeric_not_nullable, Status>::type - ConvertValues(const std::shared_ptr<Array>& arr) { + ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) { typedef typename arrow_traits<T2>::T T; + size_t chunk_offset = 0; - arrow::PrimitiveArray* prim_arr = static_cast<arrow::PrimitiveArray*>( - arr.get()); - - const T* in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + if (data->num_chunks() == 1 && data->null_count() == 0) { + return ConvertValuesZeroCopy<TYPE>(data->chunk(0)); + } - if (arr->null_count() > 0) { + if (data->null_count() > 0) { RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); - // Upcast to double, set NaN as appropriate - double* out_values = reinterpret_cast<double*>(PyArray_DATA(out_)); - for (int i = 0; i < arr->length(); ++i) { - out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i]; + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr<Array> arr = data->chunk(c); + auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + // Upcast to double, set NaN as appropriate + auto out_values = reinterpret_cast<double*>(PyArray_DATA(out_)) + chunk_offset; + + for (int i = 0; i < arr->length(); ++i) { + out_values[i] = prim_arr->IsNull(i) ? NAN : in_values[i]; + } + + chunk_offset += arr->length(); } } else { - // Zero-Copy. We can pass the data pointer directly to NumPy. - void* data = const_cast<T*>(in_values); - int type = arrow_traits<TYPE>::npy_type; - RETURN_NOT_OK(OutputFromData(type, data)); + RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type)); + + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr<Array> arr = data->chunk(c); + auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_)) + chunk_offset; + + memcpy(out_values, in_values, sizeof(T) * arr->length()); + + chunk_offset += arr->length(); + } } return Status::OK(); @@ -714,35 +747,48 @@ class ArrowDeserializer { template <int T2> inline typename std::enable_if< arrow_traits<T2>::is_boolean, Status>::type - ConvertValues(const std::shared_ptr<Array>& arr) { + ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) { + size_t chunk_offset = 0; PyAcquireGIL lock; - arrow::BooleanArray* bool_arr = static_cast<arrow::BooleanArray*>(arr.get()); - - if (arr->null_count() > 0) { + if (data->null_count() > 0) { RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); - PyObject** out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)); - for (int64_t i = 0; i < arr->length(); ++i) { - if (bool_arr->IsNull(i)) { - Py_INCREF(Py_None); - out_values[i] = Py_None; - } else if (bool_arr->Value(i)) { - // True - Py_INCREF(Py_True); - out_values[i] = Py_True; - } else { - // False - Py_INCREF(Py_False); - out_values[i] = Py_False; + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr<Array> arr = data->chunk(c); + auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get()); + auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)) + chunk_offset; + + for (int64_t i = 0; i < arr->length(); ++i) { + if (bool_arr->IsNull(i)) { + Py_INCREF(Py_None); + out_values[i] = Py_None; + } else if (bool_arr->Value(i)) { + // True + Py_INCREF(Py_True); + out_values[i] = Py_True; + } else { + // False + Py_INCREF(Py_False); + out_values[i] = Py_False; + } } + + chunk_offset += bool_arr->length(); } } else { RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type)); - uint8_t* out_values = reinterpret_cast<uint8_t*>(PyArray_DATA(out_)); - for (int64_t i = 0; i < arr->length(); ++i) { - out_values[i] = static_cast<uint8_t>(bool_arr->Value(i)); + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr<Array> arr = data->chunk(c); + auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get()); + auto out_values = reinterpret_cast<uint8_t*>(PyArray_DATA(out_)) + chunk_offset; + + for (int64_t i = 0; i < arr->length(); ++i) { + out_values[i] = static_cast<uint8_t>(bool_arr->Value(i)); + } + + chunk_offset += bool_arr->length(); } } @@ -753,42 +799,49 @@ class ArrowDeserializer { template <int T2> inline typename std::enable_if< T2 == arrow::Type::STRING, Status>::type - ConvertValues(const std::shared_ptr<Array>& arr) { + ConvertValues(const std::shared_ptr<arrow::ChunkedArray>& data) { + size_t chunk_offset = 0; PyAcquireGIL lock; RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); - PyObject** out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)); - - arrow::StringArray* string_arr = static_cast<arrow::StringArray*>(arr.get()); - - const uint8_t* data; - int32_t length; - if (arr->null_count() > 0) { - for (int64_t i = 0; i < arr->length(); ++i) { - if (string_arr->IsNull(i)) { - Py_INCREF(Py_None); - out_values[i] = Py_None; - } else { - data = string_arr->GetValue(i, &length); - - out_values[i] = make_pystring(data, length); + for (int c = 0; c < data->num_chunks(); c++) { + const std::shared_ptr<Array> arr = data->chunk(c); + auto string_arr = static_cast<arrow::StringArray*>(arr.get()); + auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)) + chunk_offset; + + const uint8_t* data_ptr; + int32_t length; + if (data->null_count() > 0) { + for (int64_t i = 0; i < arr->length(); ++i) { + if (string_arr->IsNull(i)) { + Py_INCREF(Py_None); + out_values[i] = Py_None; + } else { + data_ptr = string_arr->GetValue(i, &length); + + out_values[i] = make_pystring(data_ptr, length); + if (out_values[i] == nullptr) { + return Status::UnknownError("String initialization failed"); + } + } + } + } else { + for (int64_t i = 0; i < arr->length(); ++i) { + data_ptr = string_arr->GetValue(i, &length); + out_values[i] = make_pystring(data_ptr, length); if (out_values[i] == nullptr) { return Status::UnknownError("String initialization failed"); } } } - } else { - for (int64_t i = 0; i < arr->length(); ++i) { - data = string_arr->GetValue(i, &length); - out_values[i] = make_pystring(data, length); - if (out_values[i] == nullptr) { - return Status::UnknownError("String initialization failed"); - } - } + + chunk_offset += string_arr->length(); } + return Status::OK(); } + private: std::shared_ptr<Column> col_; PyObject* py_ref_;