This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 40dd9cc25a46aa56a5d852fbc8ebdbc55b5fe8d6 Author: Antoine Pitrou <[email protected]> AuthorDate: Tue Jan 30 10:16:51 2018 -0500 ARROW-2033: [Python] Fix pa.array() with iterator input Iterator (not iterable) input was broken with pa.array() unless both type and size were explicitly passed. Author: Wes McKinney <[email protected]> Author: Antoine Pitrou <[email protected]> Closes #1513 from pitrou/ARROW-2033-pa-array-iterator and squashes the following commits: 0013889a [Wes McKinney] Code review comments dc95be29 [Antoine Pitrou] Fix pyarrow.array with iterator input Change-Id: I930c3309e3fde12e65ede066b47985f67f7f4037 --- cpp/src/arrow/python/builtin_convert.cc | 174 ++++++++++++++------------- cpp/src/arrow/python/builtin_convert.h | 18 ++- python/pyarrow/array.pxi | 15 ++- python/pyarrow/includes/libarrow.pxd | 13 +- python/pyarrow/tests/test_convert_builtin.py | 19 +++ 5 files changed, 141 insertions(+), 98 deletions(-) diff --git a/cpp/src/arrow/python/builtin_convert.cc b/cpp/src/arrow/python/builtin_convert.cc index f7a370c..b41c55d 100644 --- a/cpp/src/arrow/python/builtin_convert.cc +++ b/cpp/src/arrow/python/builtin_convert.cc @@ -172,38 +172,26 @@ class SeqVisitor { Status Visit(PyObject* obj, int level = 0) { max_nesting_level_ = std::max(max_nesting_level_, level); - // Loop through either a sequence or an iterator. - if (PySequence_Check(obj)) { - Py_ssize_t size = PySequence_Size(obj); - for (int64_t i = 0; i < size; ++i) { - OwnedRef ref; - if (PyArray_Check(obj)) { - auto array = reinterpret_cast<PyArrayObject*>(obj); - auto ptr = reinterpret_cast<const char*>(PyArray_GETPTR1(array, i)); - - ref.reset(PyArray_GETITEM(array, ptr)); - RETURN_IF_PYERROR(); + // Loop through a sequence + if (!PySequence_Check(obj)) + return Status::TypeError("Object is not a sequence or iterable"); - RETURN_NOT_OK(VisitElem(ref, level)); - } else { - ref.reset(PySequence_GetItem(obj, i)); - RETURN_IF_PYERROR(); - RETURN_NOT_OK(VisitElem(ref, level)); - } - } - } else if (PyObject_HasAttrString(obj, "__iter__")) { - OwnedRef iter(PyObject_GetIter(obj)); - RETURN_IF_PYERROR(); + Py_ssize_t size = PySequence_Size(obj); + for (int64_t i = 0; i < size; ++i) { + OwnedRef ref; + if (PyArray_Check(obj)) { + auto array = reinterpret_cast<PyArrayObject*>(obj); + auto ptr = reinterpret_cast<const char*>(PyArray_GETPTR1(array, i)); - PyObject* item = NULLPTR; - while ((item = PyIter_Next(iter.obj()))) { + ref.reset(PyArray_GETITEM(array, ptr)); RETURN_IF_PYERROR(); - OwnedRef ref(item); + RETURN_NOT_OK(VisitElem(ref, level)); + } else { + ref.reset(PySequence_GetItem(obj, i)); + RETURN_IF_PYERROR(); RETURN_NOT_OK(VisitElem(ref, level)); } - } else { - return Status::TypeError("Object is not a sequence or iterable"); } return Status::OK(); } @@ -285,25 +273,45 @@ class SeqVisitor { } }; -Status InferArrowSize(PyObject* obj, int64_t* size) { +// Convert *obj* to a sequence if necessary +// Fill *size* to its length. If >= 0 on entry, *size* is an upper size +// bound that may lead to truncation. +Status ConvertToSequenceAndInferSize(PyObject* obj, PyObject** seq, int64_t* size) { if (PySequence_Check(obj)) { - *size = static_cast<int64_t>(PySequence_Size(obj)); - } else if (PyObject_HasAttrString(obj, "__iter__")) { + // obj is already a sequence + int64_t real_size = static_cast<int64_t>(PySequence_Size(obj)); + if (*size < 0) { + *size = real_size; + } else { + *size = std::min(real_size, *size); + } + Py_INCREF(obj); + *seq = obj; + } else if (*size < 0) { + // unknown size, exhaust iterator + *seq = PySequence_List(obj); + RETURN_IF_PYERROR(); + *size = static_cast<int64_t>(PyList_GET_SIZE(*seq)); + } else { + // size is known but iterator could be infinite + Py_ssize_t i, n = *size; PyObject* iter = PyObject_GetIter(obj); + RETURN_IF_PYERROR(); OwnedRef iter_ref(iter); - *size = 0; - PyObject* item; - while ((item = PyIter_Next(iter))) { - OwnedRef item_ref(item); - *size += 1; + PyObject* lst = PyList_New(n); + RETURN_IF_PYERROR(); + for (i = 0; i < n; i++) { + PyObject* item = PyIter_Next(iter); + if (!item) break; + PyList_SET_ITEM(lst, i, item); } - } else { - return Status::TypeError("Object is not a sequence or iterable"); - } - if (PyErr_Occurred()) { - // Not a sequence - PyErr_Clear(); - return Status::TypeError("Object is not a sequence or iterable"); + // Shrink list if len(iterator) < size + if (i < n && PyList_SetSlice(lst, i, n, NULL)) { + Py_DECREF(lst); + return Status::UnknownError("failed to resize list"); + } + *seq = lst; + *size = std::min<int64_t>(i, *size); } return Status::OK(); } @@ -325,7 +333,10 @@ Status InferArrowType(PyObject* obj, std::shared_ptr<DataType>* out_type) { Status InferArrowTypeAndSize(PyObject* obj, int64_t* size, std::shared_ptr<DataType>* out_type) { - RETURN_NOT_OK(InferArrowSize(obj, size)); + if (!PySequence_Check(obj)) { + return Status::TypeError("Object is not a sequence"); + } + *size = static_cast<int64_t>(PySequence_Size(obj)); // For 0-length sequences, refuse to guess if (*size == 0) { @@ -382,27 +393,8 @@ class TypedConverterVisitor : public TypedConverter<BuilderType> { RETURN_NOT_OK(static_cast<Derived*>(this)->AppendItem(ref)); } } - } else if (PyObject_HasAttrString(obj, "__iter__")) { - PyObject* iter = PyObject_GetIter(obj); - OwnedRef iter_ref(iter); - PyObject* item; - int64_t i = 0; - // To allow people with long generators to only convert a subset, stop - // consuming at size. - while ((item = PyIter_Next(iter)) && i < size) { - OwnedRef ref(item); - if (ref.obj() == Py_None) { - RETURN_NOT_OK(this->typed_builder_->AppendNull()); - } else { - RETURN_NOT_OK(static_cast<Derived*>(this)->AppendItem(ref)); - } - ++i; - } - if (size != i) { - RETURN_NOT_OK(this->typed_builder_->Resize(i)); - } } else { - return Status::TypeError("Object is not a sequence or iterable"); + return Status::TypeError("Object is not a sequence"); } return Status::OK(); } @@ -830,38 +822,56 @@ Status AppendPySequence(PyObject* obj, int64_t size, return converter->AppendData(obj, size); } -Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out) { +static Status ConvertPySequenceReal(PyObject* obj, int64_t size, + const std::shared_ptr<DataType>* type, + MemoryPool* pool, std::shared_ptr<Array>* out) { PyAcquireGIL lock; - std::shared_ptr<DataType> type; - int64_t size; - RETURN_NOT_OK(InferArrowTypeAndSize(obj, &size, &type)); - return ConvertPySequence(obj, pool, out, type, size); -} -Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out, - const std::shared_ptr<DataType>& type, int64_t size) { - PyAcquireGIL lock; + PyObject* seq; + ScopedRef tmp_seq_nanny; + + std::shared_ptr<DataType> real_type; + + RETURN_NOT_OK(ConvertToSequenceAndInferSize(obj, &seq, &size)); + tmp_seq_nanny.reset(seq); + if (type == nullptr) { + RETURN_NOT_OK(InferArrowType(seq, &real_type)); + } else { + real_type = *type; + } + DCHECK_GE(size, 0); + // Handle NA / NullType case - if (type->id() == Type::NA) { + if (real_type->id() == Type::NA) { out->reset(new NullArray(size)); return Status::OK(); } // Give the sequence converter an array builder std::unique_ptr<ArrayBuilder> builder; - RETURN_NOT_OK(MakeBuilder(pool, type, &builder)); - RETURN_NOT_OK(AppendPySequence(obj, size, type, builder.get())); + RETURN_NOT_OK(MakeBuilder(pool, real_type, &builder)); + RETURN_NOT_OK(AppendPySequence(seq, size, real_type, builder.get())); return builder->Finish(out); } -Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out, - const std::shared_ptr<DataType>& type) { - int64_t size; - { - PyAcquireGIL lock; - RETURN_NOT_OK(InferArrowSize(obj, &size)); - } - return ConvertPySequence(obj, pool, out, type, size); +Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out) { + return ConvertPySequenceReal(obj, -1, nullptr, pool, out); +} + +Status ConvertPySequence(PyObject* obj, const std::shared_ptr<DataType>& type, + MemoryPool* pool, std::shared_ptr<Array>* out) { + return ConvertPySequenceReal(obj, -1, &type, pool, out); +} + +Status ConvertPySequence(PyObject* obj, int64_t size, MemoryPool* pool, + std::shared_ptr<Array>* out) { + return ConvertPySequenceReal(obj, size, nullptr, pool, out); +} + +Status ConvertPySequence(PyObject* obj, int64_t size, + const std::shared_ptr<DataType>& type, MemoryPool* pool, + std::shared_ptr<Array>* out) { + return ConvertPySequenceReal(obj, size, &type, pool, out); } Status CheckPythonBytesAreFixedLength(PyObject* obj, Py_ssize_t expected_length) { diff --git a/cpp/src/arrow/python/builtin_convert.h b/cpp/src/arrow/python/builtin_convert.h index cde7a1b..4bd3f08 100644 --- a/cpp/src/arrow/python/builtin_convert.h +++ b/cpp/src/arrow/python/builtin_convert.h @@ -39,11 +39,11 @@ class Status; namespace py { +// These three functions take a sequence input, not arbitrary iterables ARROW_EXPORT arrow::Status InferArrowType(PyObject* obj, std::shared_ptr<arrow::DataType>* out_type); ARROW_EXPORT arrow::Status InferArrowTypeAndSize( PyObject* obj, int64_t* size, std::shared_ptr<arrow::DataType>* out_type); -ARROW_EXPORT arrow::Status InferArrowSize(PyObject* obj, int64_t* size); ARROW_EXPORT arrow::Status AppendPySequence(PyObject* obj, int64_t size, const std::shared_ptr<arrow::DataType>& type, @@ -53,15 +53,21 @@ ARROW_EXPORT arrow::Status AppendPySequence(PyObject* obj, int64_t size, ARROW_EXPORT Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out); -// Size inference +// Type inference only ARROW_EXPORT -Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out, - const std::shared_ptr<DataType>& type); +Status ConvertPySequence(PyObject* obj, int64_t size, MemoryPool* pool, + std::shared_ptr<Array>* out); + +// Size inference only +ARROW_EXPORT +Status ConvertPySequence(PyObject* obj, const std::shared_ptr<DataType>& type, + MemoryPool* pool, std::shared_ptr<Array>* out); // No inference ARROW_EXPORT -Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out, - const std::shared_ptr<DataType>& type, int64_t size); +Status ConvertPySequence(PyObject* obj, int64_t size, + const std::shared_ptr<DataType>& type, MemoryPool* pool, + std::shared_ptr<Array>* out); ARROW_EXPORT Status InvalidConversion(PyObject* obj, const std::string& expected_type_name, diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index cca9425..caeefd2 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -21,14 +21,21 @@ cdef _sequence_to_array(object sequence, object size, DataType type, cdef shared_ptr[CArray] out cdef int64_t c_size if type is None: - with nogil: - check_status(ConvertPySequence(sequence, pool, &out)) + if size is None: + with nogil: + check_status(ConvertPySequence(sequence, pool, &out)) + else: + c_size = size + with nogil: + check_status( + ConvertPySequence(sequence, c_size, pool, &out) + ) else: if size is None: with nogil: check_status( ConvertPySequence( - sequence, pool, &out, type.sp_type + sequence, type.sp_type, pool, &out, ) ) else: @@ -36,7 +43,7 @@ cdef _sequence_to_array(object sequence, object size, DataType type, with nogil: check_status( ConvertPySequence( - sequence, pool, &out, type.sp_type, c_size + sequence, c_size, type.sp_type, pool, &out, ) ) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 91bc96d..2e83f07 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -852,13 +852,14 @@ cdef extern from "arrow/python/api.h" namespace "arrow::py" nogil: shared_ptr[CDataType] GetTimestampType(TimeUnit unit) CStatus ConvertPySequence(object obj, CMemoryPool* pool, shared_ptr[CArray]* out) - CStatus ConvertPySequence(object obj, CMemoryPool* pool, - shared_ptr[CArray]* out, - const shared_ptr[CDataType]& type) - CStatus ConvertPySequence(object obj, CMemoryPool* pool, - shared_ptr[CArray]* out, + CStatus ConvertPySequence(object obj, const shared_ptr[CDataType]& type, + CMemoryPool* pool, shared_ptr[CArray]* out) + CStatus ConvertPySequence(object obj, int64_t size, CMemoryPool* pool, + shared_ptr[CArray]* out) + CStatus ConvertPySequence(object obj, int64_t size, const shared_ptr[CDataType]& type, - int64_t size) + CMemoryPool* pool, + shared_ptr[CArray]* out) CStatus NumPyDtypeToArrow(object dtype, shared_ptr[CDataType]* type) diff --git a/python/pyarrow/tests/test_convert_builtin.py b/python/pyarrow/tests/test_convert_builtin.py index fa603b1..2b317df 100644 --- a/python/pyarrow/tests/test_convert_builtin.py +++ b/python/pyarrow/tests/test_convert_builtin.py @@ -23,6 +23,7 @@ import pyarrow as pa import datetime import decimal +import itertools import numpy as np import six @@ -68,6 +69,24 @@ def test_limited_iterator_size_underflow(): assert arr1.equals(arr2) +def test_iterator_without_size(): + expected = pa.array((0, 1, 2)) + arr1 = pa.array(iter(range(3))) + assert arr1.equals(expected) + # Same with explicit type + arr1 = pa.array(iter(range(3)), type=pa.int64()) + assert arr1.equals(expected) + + +def test_infinite_iterator(): + expected = pa.array((0, 1, 2)) + arr1 = pa.array(itertools.count(0), size=3) + assert arr1.equals(expected) + # Same with explicit type + arr1 = pa.array(itertools.count(0), type=pa.int64(), size=3) + assert arr1.equals(expected) + + def _as_list(xs): return xs -- To stop receiving notification emails like this one, please contact [email protected].
