http://git-wip-us.apache.org/repos/asf/arrow/blob/9b1b3979/python/src/pyarrow/adapters/pandas.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index 8c2d350..6623e23 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -49,6 +49,7 @@ namespace pyarrow { using arrow::Array; using arrow::ChunkedArray; using arrow::Column; +using arrow::DictionaryType; using arrow::Field; using arrow::DataType; using arrow::ListType; @@ -60,7 +61,7 @@ using arrow::Type; namespace BitUtil = arrow::BitUtil; // ---------------------------------------------------------------------- -// Serialization +// Utility code template <int TYPE> struct npy_traits {}; @@ -242,1577 +243,1730 @@ Status AppendObjectStrings(arrow::StringBuilder& string_builder, PyObject** obje } template <int TYPE> -class ArrowSerializer { - public: - ArrowSerializer(arrow::MemoryPool* pool, PyArrayObject* arr, PyArrayObject* mask) - : pool_(pool), arr_(arr), mask_(mask) { - length_ = PyArray_SIZE(arr_); - } +struct arrow_traits {}; - void IndicateType(const std::shared_ptr<Field> field) { field_indicator_ = field; } +template <> +struct arrow_traits<Type::BOOL> { + static constexpr int npy_type = NPY_BOOL; + static constexpr bool supports_nulls = false; + static constexpr bool is_boolean = true; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = false; +}; - Status Convert(std::shared_ptr<Array>* out); +#define INT_DECL(TYPE) \ + template <> \ + struct arrow_traits<Type::TYPE> { \ + static constexpr int npy_type = NPY_##TYPE; \ + static constexpr bool supports_nulls = false; \ + static constexpr double na_value = NAN; \ + static constexpr bool is_boolean = false; \ + static constexpr bool is_numeric_not_nullable = true; \ + static constexpr bool is_numeric_nullable = false; \ + typedef typename npy_traits<NPY_##TYPE>::value_type T; \ + }; - int stride() const { return PyArray_STRIDES(arr_)[0]; } +INT_DECL(INT8); +INT_DECL(INT16); +INT_DECL(INT32); +INT_DECL(INT64); +INT_DECL(UINT8); +INT_DECL(UINT16); +INT_DECL(UINT32); +INT_DECL(UINT64); - Status InitNullBitmap() { - int null_bytes = BitUtil::BytesForBits(length_); +template <> +struct arrow_traits<Type::FLOAT> { + static constexpr int npy_type = NPY_FLOAT32; + static constexpr bool supports_nulls = true; + static constexpr float na_value = NAN; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; + typedef typename npy_traits<NPY_FLOAT32>::value_type T; +}; - null_bitmap_ = std::make_shared<arrow::PoolBuffer>(pool_); - RETURN_NOT_OK(null_bitmap_->Resize(null_bytes)); +template <> +struct arrow_traits<Type::DOUBLE> { + static constexpr int npy_type = NPY_FLOAT64; + static constexpr bool supports_nulls = true; + static constexpr double na_value = NAN; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; + typedef typename npy_traits<NPY_FLOAT64>::value_type T; +}; - null_bitmap_data_ = null_bitmap_->mutable_data(); - memset(null_bitmap_data_, 0, null_bytes); +static constexpr int64_t kPandasTimestampNull = std::numeric_limits<int64_t>::min(); - return Status::OK(); - } +template <> +struct arrow_traits<Type::TIMESTAMP> { + static constexpr int npy_type = NPY_DATETIME; + static constexpr bool supports_nulls = true; + static constexpr int64_t na_value = kPandasTimestampNull; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; + typedef typename npy_traits<NPY_DATETIME>::value_type T; +}; - bool is_strided() const { - npy_intp* astrides = PyArray_STRIDES(arr_); - return astrides[0] != PyArray_DESCR(arr_)->elsize; - } +template <> +struct arrow_traits<Type::DATE> { + static constexpr int npy_type = NPY_DATETIME; + static constexpr bool supports_nulls = true; + static constexpr int64_t na_value = kPandasTimestampNull; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = true; + typedef typename npy_traits<NPY_DATETIME>::value_type T; +}; - private: - Status ConvertData(); +template <> +struct arrow_traits<Type::STRING> { + static constexpr int npy_type = NPY_OBJECT; + static constexpr bool supports_nulls = true; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = false; +}; - Status ConvertDates(std::shared_ptr<Array>* out) { - PyAcquireGIL lock; +template <> +struct arrow_traits<Type::BINARY> { + static constexpr int npy_type = NPY_OBJECT; + static constexpr bool supports_nulls = true; + static constexpr bool is_boolean = false; + static constexpr bool is_numeric_not_nullable = false; + static constexpr bool is_numeric_nullable = false; +}; - PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); - arrow::TypePtr string_type(new arrow::DateType()); - arrow::DateBuilder date_builder(pool_, string_type); - RETURN_NOT_OK(date_builder.Resize(length_)); +template <typename T> +struct WrapBytes {}; - Status s; - PyObject* obj; - for (int64_t i = 0; i < length_; ++i) { - obj = objects[i]; - if (PyDate_CheckExact(obj)) { - PyDateTime_Date* pydate = reinterpret_cast<PyDateTime_Date*>(obj); - date_builder.Append(PyDate_to_ms(pydate)); - } else { - date_builder.AppendNull(); - } - } - return date_builder.Finish(out); +template <> +struct WrapBytes<arrow::StringArray> { + static inline PyObject* Wrap(const uint8_t* data, int64_t length) { + return PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(data), length); } +}; - Status ConvertObjectStrings(std::shared_ptr<Array>* out) { - PyAcquireGIL lock; +template <> +struct WrapBytes<arrow::BinaryArray> { + static inline PyObject* Wrap(const uint8_t* data, int64_t length) { + return PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), length); + } +}; - // The output type at this point is inconclusive because there may be bytes - // and unicode mixed in the object array +inline void set_numpy_metadata(int type, DataType* datatype, PyArrayObject* out) { + if (type == NPY_DATETIME) { + PyArray_Descr* descr = PyArray_DESCR(out); + auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata); + if (datatype->type == Type::TIMESTAMP) { + auto timestamp_type = static_cast<arrow::TimestampType*>(datatype); - PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); - arrow::TypePtr string_type(new arrow::StringType()); - arrow::StringBuilder string_builder(pool_, string_type); - RETURN_NOT_OK(string_builder.Resize(length_)); + switch (timestamp_type->unit) { + case arrow::TimestampType::Unit::SECOND: + date_dtype->meta.base = NPY_FR_s; + break; + case arrow::TimestampType::Unit::MILLI: + date_dtype->meta.base = NPY_FR_ms; + break; + case arrow::TimestampType::Unit::MICRO: + date_dtype->meta.base = NPY_FR_us; + break; + case arrow::TimestampType::Unit::NANO: + date_dtype->meta.base = NPY_FR_ns; + break; + } + } else { + // datatype->type == Type::DATE + date_dtype->meta.base = NPY_FR_D; + } + } +} - Status s; - bool have_bytes = false; - RETURN_NOT_OK(AppendObjectStrings(string_builder, objects, length_, &have_bytes)); - RETURN_NOT_OK(string_builder.Finish(out)); +template <typename T> +inline void ConvertIntegerWithNulls(const ChunkedArray& data, double* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + // Upcast to double, set NaN as appropriate - if (have_bytes) { - const auto& arr = static_cast<const arrow::StringArray&>(*out->get()); - *out = std::make_shared<arrow::BinaryArray>( - arr.length(), arr.offsets(), arr.data(), arr.null_count(), arr.null_bitmap()); + for (int i = 0; i < arr->length(); ++i) { + *out_values++ = prim_arr->IsNull(i) ? NAN : in_values[i]; } - return Status::OK(); } +} - Status ConvertBooleans(std::shared_ptr<Array>* out) { - PyAcquireGIL lock; +template <typename T> +inline void ConvertIntegerNoNullsSameType(const ChunkedArray& data, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + memcpy(out_values, in_values, sizeof(T) * arr->length()); + out_values += arr->length(); + } +} - PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); +template <typename InType, typename OutType> +inline void ConvertIntegerNoNullsCast(const ChunkedArray& data, OutType* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); + for (int32_t i = 0; i < arr->length(); ++i) { + *out_values = in_values[i]; + } + } +} - int nbytes = BitUtil::BytesForBits(length_); - auto data = std::make_shared<arrow::PoolBuffer>(pool_); - RETURN_NOT_OK(data->Resize(nbytes)); - uint8_t* bitmap = data->mutable_data(); - memset(bitmap, 0, nbytes); +static Status ConvertBooleanWithNulls(const ChunkedArray& data, PyObject** out_values) { + PyAcquireGIL lock; + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get()); - int64_t null_count = 0; - for (int64_t i = 0; i < length_; ++i) { - if (objects[i] == Py_True) { - BitUtil::SetBit(bitmap, i); - BitUtil::SetBit(null_bitmap_data_, i); - } else if (objects[i] != Py_False) { - ++null_count; + for (int64_t i = 0; i < arr->length(); ++i) { + if (bool_arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values++ = Py_None; + } else if (bool_arr->Value(i)) { + // True + Py_INCREF(Py_True); + *out_values++ = Py_True; } else { - BitUtil::SetBit(null_bitmap_data_, i); + // False + Py_INCREF(Py_False); + *out_values++ = Py_False; } } - - *out = std::make_shared<arrow::BooleanArray>(length_, data, null_count, null_bitmap_); - - return Status::OK(); } + return Status::OK(); +} - template <int ITEM_TYPE, typename ArrowType> - Status ConvertTypedLists( - const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out); - -#define LIST_CASE(TYPE, NUMPY_TYPE, ArrowType) \ - case Type::TYPE: { \ - return ConvertTypedLists<NUMPY_TYPE, ::arrow::ArrowType>(field, out); \ +static void ConvertBooleanNoNulls(const ChunkedArray& data, uint8_t* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get()); + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = static_cast<uint8_t>(bool_arr->Value(i)); + } } +} - Status ConvertLists(const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) { - switch (field->type->type) { - LIST_CASE(UINT8, NPY_UINT8, UInt8Type) - LIST_CASE(INT8, NPY_INT8, Int8Type) - LIST_CASE(UINT16, NPY_UINT16, UInt16Type) - LIST_CASE(INT16, NPY_INT16, Int16Type) - LIST_CASE(UINT32, NPY_UINT32, UInt32Type) - LIST_CASE(INT32, NPY_INT32, Int32Type) - LIST_CASE(UINT64, NPY_UINT64, UInt64Type) - LIST_CASE(INT64, NPY_INT64, Int64Type) - LIST_CASE(TIMESTAMP, NPY_DATETIME, TimestampType) - LIST_CASE(FLOAT, NPY_FLOAT, FloatType) - LIST_CASE(DOUBLE, NPY_DOUBLE, DoubleType) - LIST_CASE(STRING, NPY_OBJECT, StringType) - default: - return Status::TypeError("Unknown list item type"); - } - - return Status::TypeError("Unknown list type"); - } - - Status MakeDataType(std::shared_ptr<DataType>* out); - - arrow::MemoryPool* pool_; - - PyArrayObject* arr_; - PyArrayObject* mask_; - - int64_t length_; - - std::shared_ptr<Field> field_indicator_; - std::shared_ptr<arrow::Buffer> data_; - std::shared_ptr<arrow::ResizableBuffer> null_bitmap_; - uint8_t* null_bitmap_data_; -}; +template <typename ArrayType> +inline Status ConvertBinaryLike(const ChunkedArray& data, PyObject** out_values) { + PyAcquireGIL lock; + for (int c = 0; c < data.num_chunks(); c++) { + auto arr = static_cast<ArrayType*>(data.chunk(c).get()); -// Returns null count -static int64_t MaskToBitmap(PyArrayObject* mask, int64_t length, uint8_t* bitmap) { - int64_t null_count = 0; - const uint8_t* mask_values = static_cast<const uint8_t*>(PyArray_DATA(mask)); - // TODO(wesm): strided null mask - for (int i = 0; i < length; ++i) { - if (mask_values[i]) { - ++null_count; - } else { - BitUtil::SetBit(bitmap, i); + const uint8_t* data_ptr; + int32_t length; + const bool has_nulls = data.null_count() > 0; + for (int64_t i = 0; i < arr->length(); ++i) { + if (has_nulls && arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values = Py_None; + } else { + data_ptr = arr->GetValue(i, &length); + *out_values = WrapBytes<ArrayType>::Wrap(data_ptr, length); + if (*out_values == nullptr) { + PyErr_Clear(); + std::stringstream ss; + ss << "Wrapping " + << std::string(reinterpret_cast<const char*>(data_ptr), length) << " failed"; + return Status::UnknownError(ss.str()); + } + } + ++out_values; } } - return null_count; -} - -template <int TYPE> -inline Status ArrowSerializer<TYPE>::MakeDataType(std::shared_ptr<DataType>* out) { - out->reset(new typename npy_traits<TYPE>::TypeClass()); return Status::OK(); } -template <> -inline Status ArrowSerializer<NPY_DATETIME>::MakeDataType( - std::shared_ptr<DataType>* out) { - PyArray_Descr* descr = PyArray_DESCR(arr_); - auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata); - arrow::TimestampType::Unit unit; +template <typename ArrowType> +inline Status ConvertListsLike( + const std::shared_ptr<Column>& col, PyObject** out_values) { + const ChunkedArray& data = *col->data().get(); + auto list_type = std::static_pointer_cast<ListType>(col->type()); - switch (date_dtype->meta.base) { - case NPY_FR_s: - unit = arrow::TimestampType::Unit::SECOND; - break; - case NPY_FR_ms: - unit = arrow::TimestampType::Unit::MILLI; - break; - case NPY_FR_us: - unit = arrow::TimestampType::Unit::MICRO; - break; - case NPY_FR_ns: - unit = arrow::TimestampType::Unit::NANO; - break; - default: - return Status::Invalid("Unknown NumPy datetime unit"); + // Get column of underlying value arrays + std::vector<std::shared_ptr<Array>> value_arrays; + for (int c = 0; c < data.num_chunks(); c++) { + auto arr = std::static_pointer_cast<arrow::ListArray>(data.chunk(c)); + value_arrays.emplace_back(arr->values()); } + auto flat_column = std::make_shared<Column>(list_type->value_field(), value_arrays); + // TODO(ARROW-489): Currently we don't have a Python reference for single columns. + // Storing a reference to the whole Array would be to expensive. + PyObject* numpy_array; + RETURN_NOT_OK(ConvertColumnToPandas(flat_column, nullptr, &numpy_array)); - out->reset(new arrow::TimestampType(unit)); - return Status::OK(); -} - -template <int TYPE> -inline Status ArrowSerializer<TYPE>::Convert(std::shared_ptr<Array>* out) { - typedef npy_traits<TYPE> traits; + PyAcquireGIL lock; - if (mask_ != nullptr || traits::supports_nulls) { RETURN_NOT_OK(InitNullBitmap()); } + for (int c = 0; c < data.num_chunks(); c++) { + auto arr = std::static_pointer_cast<arrow::ListArray>(data.chunk(c)); - int64_t null_count = 0; - if (mask_ != nullptr) { - null_count = MaskToBitmap(mask_, length_, null_bitmap_data_); - } else if (traits::supports_nulls) { - null_count = ValuesToBitmap<TYPE>(PyArray_DATA(arr_), length_, null_bitmap_data_); + const uint8_t* data_ptr; + int32_t length; + const bool has_nulls = data.null_count() > 0; + for (int64_t i = 0; i < arr->length(); ++i) { + if (has_nulls && arr->IsNull(i)) { + Py_INCREF(Py_None); + *out_values = Py_None; + } else { + PyObject* start = PyLong_FromLong(arr->value_offset(i)); + PyObject* end = PyLong_FromLong(arr->value_offset(i + 1)); + PyObject* slice = PySlice_New(start, end, NULL); + *out_values = PyObject_GetItem(numpy_array, slice); + Py_DECREF(start); + Py_DECREF(end); + Py_DECREF(slice); + } + ++out_values; + } } - RETURN_NOT_OK(ConvertData()); - std::shared_ptr<DataType> type; - RETURN_NOT_OK(MakeDataType(&type)); - RETURN_NOT_OK(MakePrimitiveArray(type, length_, data_, null_count, null_bitmap_, out)); + Py_XDECREF(numpy_array); return Status::OK(); } -template <> -inline Status ArrowSerializer<NPY_OBJECT>::Convert(std::shared_ptr<Array>* out) { - // Python object arrays are annoying, since we could have one of: - // - // * Strings - // * Booleans with nulls - // * Mixed type (not supported at the moment by arrow format) - // - // Additionally, nulls may be encoded either as np.nan or None. So we have to - // do some type inference and conversion - - RETURN_NOT_OK(InitNullBitmap()); +template <typename T> +inline void ConvertNumericNullable(const ChunkedArray& data, T na_value, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); - // TODO: mask not supported here - const PyObject** objects = reinterpret_cast<const PyObject**>(PyArray_DATA(arr_)); - { - PyAcquireGIL lock; - PyDateTime_IMPORT; - } + const uint8_t* valid_bits = arr->null_bitmap_data(); - if (field_indicator_) { - switch (field_indicator_->type->type) { - case Type::STRING: - return ConvertObjectStrings(out); - case Type::BOOL: - return ConvertBooleans(out); - case Type::DATE: - return ConvertDates(out); - case Type::LIST: { - auto list_field = static_cast<ListType*>(field_indicator_->type.get()); - return ConvertLists(list_field->value_field(), out); - } - default: - return Status::TypeError("No known conversion to Arrow type"); - } - } else { - for (int64_t i = 0; i < length_; ++i) { - if (PyObject_is_null(objects[i])) { - continue; - } else if (PyObject_is_string(objects[i])) { - return ConvertObjectStrings(out); - } else if (PyBool_Check(objects[i])) { - return ConvertBooleans(out); - } else if (PyDate_CheckExact(objects[i])) { - return ConvertDates(out); - } else { - return Status::TypeError("unhandled python type"); + if (arr->null_count() > 0) { + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = BitUtil::BitNotSet(valid_bits, i) ? na_value : in_values[i]; } + } else { + memcpy(out_values, in_values, sizeof(T) * arr->length()); + out_values += arr->length(); } } - - return Status::TypeError("Unable to infer type of object array, were all null"); } -template <int TYPE> -inline Status ArrowSerializer<TYPE>::ConvertData() { - // TODO(wesm): strided arrays - if (is_strided()) { return Status::Invalid("no support for strided data yet"); } +template <typename InType, typename OutType> +inline void ConvertNumericNullableCast( + const ChunkedArray& data, OutType na_value, OutType* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); - data_ = std::make_shared<NumPyBuffer>(arr_); - return Status::OK(); + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = arr->IsNull(i) ? na_value : static_cast<OutType>(in_values[i]); + } + } } -template <> -inline Status ArrowSerializer<NPY_BOOL>::ConvertData() { - if (is_strided()) { return Status::Invalid("no support for strided data yet"); } - - int nbytes = BitUtil::BytesForBits(length_); - auto buffer = std::make_shared<arrow::PoolBuffer>(pool_); - RETURN_NOT_OK(buffer->Resize(nbytes)); - - const uint8_t* values = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_)); - - uint8_t* bitmap = buffer->mutable_data(); +template <typename T> +inline void ConvertDates(const ChunkedArray& data, T na_value, T* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); - memset(bitmap, 0, nbytes); - for (int i = 0; i < length_; ++i) { - if (values[i] > 0) { BitUtil::SetBit(bitmap, i); } + for (int64_t i = 0; i < arr->length(); ++i) { + // There are 1000 * 60 * 60 * 24 = 86400000ms in a day + *out_values++ = arr->IsNull(i) ? na_value : in_values[i] / 86400000; + } } +} - data_ = buffer; +template <typename InType, int SHIFT> +inline void ConvertDatetimeNanos(const ChunkedArray& data, int64_t* out_values) { + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); + auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); - return Status::OK(); + for (int64_t i = 0; i < arr->length(); ++i) { + *out_values++ = arr->IsNull(i) ? kPandasTimestampNull + : (static_cast<int64_t>(in_values[i]) * SHIFT); + } + } } -template <int TYPE> -template <int ITEM_TYPE, typename ArrowType> -inline Status ArrowSerializer<TYPE>::ConvertTypedLists( - const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) { - typedef npy_traits<ITEM_TYPE> traits; - typedef typename traits::value_type T; - typedef typename traits::BuilderClass BuilderT; +// ---------------------------------------------------------------------- +// pandas 0.x DataFrame conversion internals - auto value_builder = std::make_shared<BuilderT>(pool_, field->type); - ListBuilder list_builder(pool_, value_builder); - PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); - for (int64_t i = 0; i < length_; ++i) { - if (PyObject_is_null(objects[i])) { - RETURN_NOT_OK(list_builder.AppendNull()); - } else if (PyArray_Check(objects[i])) { - auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]); - RETURN_NOT_OK(list_builder.Append(true)); +class PandasBlock { + public: + enum type { + OBJECT, + UINT8, + INT8, + UINT16, + INT16, + UINT32, + INT32, + UINT64, + INT64, + FLOAT, + DOUBLE, + BOOL, + DATETIME, + CATEGORICAL + }; - // TODO(uwe): Support more complex numpy array structures - RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, ITEM_TYPE)); + PandasBlock(int64_t num_rows, int num_columns) + : num_rows_(num_rows), num_columns_(num_columns) {} + virtual ~PandasBlock() {} - int32_t size = PyArray_DIM(numpy_array, 0); - auto data = reinterpret_cast<const T*>(PyArray_DATA(numpy_array)); - if (traits::supports_nulls) { - null_bitmap_->Resize(size, false); - // TODO(uwe): A bitmap would be more space-efficient but the Builder API doesn't - // currently support this. - // ValuesToBitmap<ITEM_TYPE>(data, size, null_bitmap_->mutable_data()); - ValuesToBytemap<ITEM_TYPE>(data, size, null_bitmap_->mutable_data()); - RETURN_NOT_OK(value_builder->Append(data, size, null_bitmap_->data())); - } else { - RETURN_NOT_OK(value_builder->Append(data, size)); - } - } else if (PyList_Check(objects[i])) { - return Status::TypeError("Python lists are not yet supported"); - } else { - return Status::TypeError("Unsupported Python type for list items"); - } - } - return list_builder.Finish(out); -} + virtual Status Allocate() = 0; + virtual Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) = 0; -template <> -template <> -inline Status -ArrowSerializer<NPY_OBJECT>::ConvertTypedLists<NPY_OBJECT, ::arrow::StringType>( - const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) { - // TODO: If there are bytes involed, convert to Binary representation - bool have_bytes = false; + PyObject* block_arr() const { return block_arr_.obj(); } - auto value_builder = std::make_shared<arrow::StringBuilder>(pool_, field->type); - ListBuilder list_builder(pool_, value_builder); - PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); - for (int64_t i = 0; i < length_; ++i) { - if (PyObject_is_null(objects[i])) { - RETURN_NOT_OK(list_builder.AppendNull()); - } else if (PyArray_Check(objects[i])) { - auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]); - RETURN_NOT_OK(list_builder.Append(true)); + virtual Status GetPyResult(PyObject** output) { + PyObject* result = PyDict_New(); + RETURN_IF_PYERROR(); - // TODO(uwe): Support more complex numpy array structures - RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, NPY_OBJECT)); + PyDict_SetItemString(result, "block", block_arr_.obj()); + PyDict_SetItemString(result, "placement", placement_arr_.obj()); - int32_t size = PyArray_DIM(numpy_array, 0); - auto data = reinterpret_cast<PyObject**>(PyArray_DATA(numpy_array)); - RETURN_NOT_OK(AppendObjectStrings(*value_builder.get(), data, size, &have_bytes)); - } else if (PyList_Check(objects[i])) { - return Status::TypeError("Python lists are not yet supported"); + *output = result; + + return Status::OK(); + } + + protected: + Status AllocateNDArray(int npy_type, int ndim = 2) { + PyAcquireGIL lock; + + PyObject* block_arr; + if (ndim == 2) { + npy_intp block_dims[2] = {num_columns_, num_rows_}; + block_arr = PyArray_SimpleNew(2, block_dims, npy_type); } else { - return Status::TypeError("Unsupported Python type for list items"); + npy_intp block_dims[1] = {num_rows_}; + block_arr = PyArray_SimpleNew(1, block_dims, npy_type); } - } - return list_builder.Finish(out); -} -template <> -inline Status ArrowSerializer<NPY_OBJECT>::ConvertData() { - return Status::TypeError("NYI"); -} + if (block_arr == NULL) { + // TODO(wesm): propagating Python exception + return Status::OK(); + } -#define TO_ARROW_CASE(TYPE) \ - case NPY_##TYPE: { \ - ArrowSerializer<NPY_##TYPE> converter(pool, arr, mask); \ - RETURN_NOT_OK(converter.Convert(out)); \ - } break; + npy_intp placement_dims[1] = {num_columns_}; + PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT64); + if (placement_arr == NULL) { + // TODO(wesm): propagating Python exception + return Status::OK(); + } -Status PandasMaskedToArrow(arrow::MemoryPool* pool, PyObject* ao, PyObject* mo, - const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) { - PyArrayObject* arr = reinterpret_cast<PyArrayObject*>(ao); - PyArrayObject* mask = nullptr; + block_arr_.reset(block_arr); + placement_arr_.reset(placement_arr); - if (mo != nullptr) { mask = reinterpret_cast<PyArrayObject*>(mo); } + block_data_ = reinterpret_cast<uint8_t*>( + PyArray_DATA(reinterpret_cast<PyArrayObject*>(block_arr))); - if (PyArray_NDIM(arr) != 1) { - return Status::Invalid("only handle 1-dimensional arrays"); + placement_data_ = reinterpret_cast<int64_t*>( + PyArray_DATA(reinterpret_cast<PyArrayObject*>(placement_arr))); + + return Status::OK(); } - switch (PyArray_DESCR(arr)->type_num) { - TO_ARROW_CASE(BOOL); - TO_ARROW_CASE(INT8); - TO_ARROW_CASE(INT16); - TO_ARROW_CASE(INT32); - TO_ARROW_CASE(INT64); - TO_ARROW_CASE(UINT8); - TO_ARROW_CASE(UINT16); - TO_ARROW_CASE(UINT32); - TO_ARROW_CASE(UINT64); - TO_ARROW_CASE(FLOAT32); - TO_ARROW_CASE(FLOAT64); - TO_ARROW_CASE(DATETIME); - case NPY_OBJECT: { - ArrowSerializer<NPY_OBJECT> converter(pool, arr, mask); - converter.IndicateType(field); - RETURN_NOT_OK(converter.Convert(out)); - } break; - default: + int64_t num_rows_; + int num_columns_; + + OwnedRef block_arr_; + uint8_t* block_data_; + + // ndarray<int32> + OwnedRef placement_arr_; + int64_t* placement_data_; + + DISALLOW_COPY_AND_ASSIGN(PandasBlock); +}; + +#define CONVERTLISTSLIKE_CASE(ArrowType, ArrowEnum) \ + case Type::ArrowEnum: \ + RETURN_NOT_OK((ConvertListsLike<::arrow::ArrowType>(col, out_buffer))); \ + break; + +class ObjectBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; + virtual ~ObjectBlock() {} + + Status Allocate() override { return AllocateNDArray(NPY_OBJECT); } + + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; + + PyObject** out_buffer = + reinterpret_cast<PyObject**>(block_data_) + rel_placement * num_rows_; + + const ChunkedArray& data = *col->data().get(); + + if (type == Type::BOOL) { + RETURN_NOT_OK(ConvertBooleanWithNulls(data, out_buffer)); + } else if (type == Type::BINARY) { + RETURN_NOT_OK(ConvertBinaryLike<arrow::BinaryArray>(data, out_buffer)); + } else if (type == Type::STRING) { + RETURN_NOT_OK(ConvertBinaryLike<arrow::StringArray>(data, out_buffer)); + } else if (type == Type::LIST) { + auto list_type = std::static_pointer_cast<ListType>(col->type()); + switch (list_type->value_type()->type) { + CONVERTLISTSLIKE_CASE(UInt8Type, UINT8) + CONVERTLISTSLIKE_CASE(Int8Type, INT8) + CONVERTLISTSLIKE_CASE(UInt16Type, UINT16) + CONVERTLISTSLIKE_CASE(Int16Type, INT16) + CONVERTLISTSLIKE_CASE(UInt32Type, UINT32) + CONVERTLISTSLIKE_CASE(Int32Type, INT32) + CONVERTLISTSLIKE_CASE(UInt64Type, UINT64) + CONVERTLISTSLIKE_CASE(Int64Type, INT64) + CONVERTLISTSLIKE_CASE(TimestampType, TIMESTAMP) + CONVERTLISTSLIKE_CASE(FloatType, FLOAT) + CONVERTLISTSLIKE_CASE(DoubleType, DOUBLE) + CONVERTLISTSLIKE_CASE(StringType, STRING) + default: { + std::stringstream ss; + ss << "Not implemented type for lists: " << list_type->value_type()->ToString(); + return Status::NotImplemented(ss.str()); + } + } + } else { std::stringstream ss; - ss << "unsupported type " << PyArray_DESCR(arr)->type_num << std::endl; + ss << "Unsupported type for object array output: " << col->type()->ToString(); return Status::NotImplemented(ss.str()); + } + + placement_data_[rel_placement] = abs_placement; + return Status::OK(); } - return Status::OK(); -} +}; -Status PandasToArrow(arrow::MemoryPool* pool, PyObject* ao, - const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) { - return PandasMaskedToArrow(pool, ao, nullptr, field, out); -} +template <int ARROW_TYPE, typename C_TYPE> +class IntBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; -// ---------------------------------------------------------------------- -// Deserialization + Status Allocate() override { + return AllocateNDArray(arrow_traits<ARROW_TYPE>::npy_type); + } -template <int TYPE> -struct arrow_traits {}; + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; -template <> -struct arrow_traits<arrow::Type::BOOL> { - static constexpr int npy_type = NPY_BOOL; - static constexpr bool supports_nulls = false; - static constexpr bool is_boolean = true; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = false; -}; + C_TYPE* out_buffer = + reinterpret_cast<C_TYPE*>(block_data_) + rel_placement * num_rows_; -#define INT_DECL(TYPE) \ - template <> \ - struct arrow_traits<arrow::Type::TYPE> { \ - static constexpr int npy_type = NPY_##TYPE; \ - static constexpr bool supports_nulls = false; \ - static constexpr double na_value = NAN; \ - static constexpr bool is_boolean = false; \ - static constexpr bool is_numeric_not_nullable = true; \ - static constexpr bool is_numeric_nullable = false; \ - typedef typename npy_traits<NPY_##TYPE>::value_type T; \ - }; + const ChunkedArray& data = *col->data().get(); -INT_DECL(INT8); -INT_DECL(INT16); -INT_DECL(INT32); -INT_DECL(INT64); -INT_DECL(UINT8); -INT_DECL(UINT16); -INT_DECL(UINT32); -INT_DECL(UINT64); + if (type != ARROW_TYPE) { return Status::NotImplemented(col->type()->ToString()); } -template <> -struct arrow_traits<arrow::Type::FLOAT> { - static constexpr int npy_type = NPY_FLOAT32; - static constexpr bool supports_nulls = true; - static constexpr float na_value = NAN; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = true; - typedef typename npy_traits<NPY_FLOAT32>::value_type T; + ConvertIntegerNoNullsSameType<C_TYPE>(data, out_buffer); + placement_data_[rel_placement] = abs_placement; + return Status::OK(); + } }; -template <> -struct arrow_traits<arrow::Type::DOUBLE> { - static constexpr int npy_type = NPY_FLOAT64; - static constexpr bool supports_nulls = true; - static constexpr double na_value = NAN; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = true; - typedef typename npy_traits<NPY_FLOAT64>::value_type T; -}; +using UInt8Block = IntBlock<Type::UINT8, uint8_t>; +using Int8Block = IntBlock<Type::INT8, int8_t>; +using UInt16Block = IntBlock<Type::UINT16, uint16_t>; +using Int16Block = IntBlock<Type::INT16, int16_t>; +using UInt32Block = IntBlock<Type::UINT32, uint32_t>; +using Int32Block = IntBlock<Type::INT32, int32_t>; +using UInt64Block = IntBlock<Type::UINT64, uint64_t>; +using Int64Block = IntBlock<Type::INT64, int64_t>; -static constexpr int64_t kPandasTimestampNull = std::numeric_limits<int64_t>::min(); +class Float32Block : public PandasBlock { + public: + using PandasBlock::PandasBlock; -template <> -struct arrow_traits<arrow::Type::TIMESTAMP> { - static constexpr int npy_type = NPY_DATETIME; - static constexpr bool supports_nulls = true; - static constexpr int64_t na_value = kPandasTimestampNull; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = true; - typedef typename npy_traits<NPY_DATETIME>::value_type T; -}; + Status Allocate() override { return AllocateNDArray(NPY_FLOAT32); } -template <> -struct arrow_traits<arrow::Type::DATE> { - static constexpr int npy_type = NPY_DATETIME; - static constexpr bool supports_nulls = true; - static constexpr int64_t na_value = kPandasTimestampNull; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = true; - typedef typename npy_traits<NPY_DATETIME>::value_type T; + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; + + if (type != Type::FLOAT) { return Status::NotImplemented(col->type()->ToString()); } + + float* out_buffer = reinterpret_cast<float*>(block_data_) + rel_placement * num_rows_; + + ConvertNumericNullable<float>(*col->data().get(), NAN, out_buffer); + placement_data_[rel_placement] = abs_placement; + return Status::OK(); + } }; -template <> -struct arrow_traits<arrow::Type::STRING> { - static constexpr int npy_type = NPY_OBJECT; - static constexpr bool supports_nulls = true; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = false; -}; +class Float64Block : public PandasBlock { + public: + using PandasBlock::PandasBlock; + + Status Allocate() override { return AllocateNDArray(NPY_FLOAT64); } + + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; + + double* out_buffer = + reinterpret_cast<double*>(block_data_) + rel_placement * num_rows_; + + const ChunkedArray& data = *col->data().get(); + +#define INTEGER_CASE(IN_TYPE) \ + ConvertIntegerWithNulls<IN_TYPE>(data, out_buffer); \ + break; + + switch (type) { + case Type::UINT8: + INTEGER_CASE(uint8_t); + case Type::INT8: + INTEGER_CASE(int8_t); + case Type::UINT16: + INTEGER_CASE(uint16_t); + case Type::INT16: + INTEGER_CASE(int16_t); + case Type::UINT32: + INTEGER_CASE(uint32_t); + case Type::INT32: + INTEGER_CASE(int32_t); + case Type::UINT64: + INTEGER_CASE(uint64_t); + case Type::INT64: + INTEGER_CASE(int64_t); + case Type::FLOAT: + ConvertNumericNullableCast<float, double>(data, NAN, out_buffer); + break; + case Type::DOUBLE: + ConvertNumericNullable<double>(data, NAN, out_buffer); + break; + default: + return Status::NotImplemented(col->type()->ToString()); + } + +#undef INTEGER_CASE -template <> -struct arrow_traits<arrow::Type::BINARY> { - static constexpr int npy_type = NPY_OBJECT; - static constexpr bool supports_nulls = true; - static constexpr bool is_boolean = false; - static constexpr bool is_numeric_not_nullable = false; - static constexpr bool is_numeric_nullable = false; + placement_data_[rel_placement] = abs_placement; + return Status::OK(); + } }; -template <typename T> -struct WrapBytes {}; +class BoolBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; -template <> -struct WrapBytes<arrow::StringArray> { - static inline PyObject* Wrap(const uint8_t* data, int64_t length) { - return PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(data), length); - } -}; + Status Allocate() override { return AllocateNDArray(NPY_BOOL); } -template <> -struct WrapBytes<arrow::BinaryArray> { - static inline PyObject* Wrap(const uint8_t* data, int64_t length) { - return PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), length); - } -}; + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; -inline void set_numpy_metadata(int type, DataType* datatype, PyArrayObject* out) { - if (type == NPY_DATETIME) { - PyArray_Descr* descr = PyArray_DESCR(out); - auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata); - if (datatype->type == arrow::Type::TIMESTAMP) { - auto timestamp_type = static_cast<arrow::TimestampType*>(datatype); + if (type != Type::BOOL) { return Status::NotImplemented(col->type()->ToString()); } - switch (timestamp_type->unit) { - case arrow::TimestampType::Unit::SECOND: - date_dtype->meta.base = NPY_FR_s; - break; - case arrow::TimestampType::Unit::MILLI: - date_dtype->meta.base = NPY_FR_ms; - break; - case arrow::TimestampType::Unit::MICRO: - date_dtype->meta.base = NPY_FR_us; - break; - case arrow::TimestampType::Unit::NANO: - date_dtype->meta.base = NPY_FR_ns; - break; - } - } else { - // datatype->type == arrow::Type::DATE - date_dtype->meta.base = NPY_FR_D; - } + uint8_t* out_buffer = + reinterpret_cast<uint8_t*>(block_data_) + rel_placement * num_rows_; + + ConvertBooleanNoNulls(*col->data().get(), out_buffer); + placement_data_[rel_placement] = abs_placement; + return Status::OK(); } -} +}; -template <typename T> -inline void ConvertIntegerWithNulls(const ChunkedArray& data, double* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); - // Upcast to double, set NaN as appropriate +class DatetimeBlock : public PandasBlock { + public: + using PandasBlock::PandasBlock; - for (int i = 0; i < arr->length(); ++i) { - *out_values++ = prim_arr->IsNull(i) ? NAN : in_values[i]; - } - } -} + Status Allocate() override { + RETURN_NOT_OK(AllocateNDArray(NPY_DATETIME)); -template <typename T> -inline void ConvertIntegerNoNullsSameType(const ChunkedArray& data, T* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); - memcpy(out_values, in_values, sizeof(T) * arr->length()); - out_values += arr->length(); + PyAcquireGIL lock; + auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>( + PyArray_DESCR(reinterpret_cast<PyArrayObject*>(block_arr_.obj()))->c_metadata); + date_dtype->meta.base = NPY_FR_ns; + return Status::OK(); } -} -template <typename InType, typename OutType> -inline void ConvertIntegerNoNullsCast(const ChunkedArray& data, OutType* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); - for (int32_t i = 0; i < arr->length(); ++i) { - *out_values = in_values[i]; - } - } -} + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + Type::type type = col->type()->type; -static Status ConvertBooleanWithNulls(const ChunkedArray& data, PyObject** out_values) { - PyAcquireGIL lock; - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get()); + int64_t* out_buffer = + reinterpret_cast<int64_t*>(block_data_) + rel_placement * num_rows_; - for (int64_t i = 0; i < arr->length(); ++i) { - if (bool_arr->IsNull(i)) { - Py_INCREF(Py_None); - *out_values++ = Py_None; - } else if (bool_arr->Value(i)) { - // True - Py_INCREF(Py_True); - *out_values++ = Py_True; + const ChunkedArray& data = *col.get()->data(); + + if (type == Type::DATE) { + // DateType is millisecond timestamp stored as int64_t + // TODO(wesm): Do we want to make sure to zero out the milliseconds? + ConvertDatetimeNanos<int64_t, 1000000L>(data, out_buffer); + } else if (type == Type::TIMESTAMP) { + auto ts_type = static_cast<arrow::TimestampType*>(col->type().get()); + + if (ts_type->unit == arrow::TimeUnit::NANO) { + ConvertNumericNullable<int64_t>(data, kPandasTimestampNull, out_buffer); + } else if (ts_type->unit == arrow::TimeUnit::MICRO) { + ConvertDatetimeNanos<int64_t, 1000L>(data, out_buffer); + } else if (ts_type->unit == arrow::TimeUnit::MILLI) { + ConvertDatetimeNanos<int64_t, 1000000L>(data, out_buffer); + } else if (ts_type->unit == arrow::TimeUnit::SECOND) { + ConvertDatetimeNanos<int64_t, 1000000000L>(data, out_buffer); } else { - // False - Py_INCREF(Py_False); - *out_values++ = Py_False; + return Status::NotImplemented("Unsupported time unit"); } + } else { + return Status::NotImplemented(col->type()->ToString()); } + + placement_data_[rel_placement] = abs_placement; + return Status::OK(); } - return Status::OK(); -} +}; -static void ConvertBooleanNoNulls(const ChunkedArray& data, uint8_t* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto bool_arr = static_cast<arrow::BooleanArray*>(arr.get()); - for (int64_t i = 0; i < arr->length(); ++i) { - *out_values++ = static_cast<uint8_t>(bool_arr->Value(i)); +template <int ARROW_INDEX_TYPE> +class CategoricalBlock : public PandasBlock { + public: + CategoricalBlock(int64_t num_rows) : PandasBlock(num_rows, 1) {} + + Status Allocate() override { + constexpr int npy_type = arrow_traits<ARROW_INDEX_TYPE>::npy_type; + + if (!(npy_type == NPY_INT8 || npy_type == NPY_INT16 || npy_type == NPY_INT32 || + npy_type == NPY_INT64)) { + return Status::Invalid("Category indices must be signed integers"); } + return AllocateNDArray(npy_type, 1); } -} -template <typename ArrayType> -inline Status ConvertBinaryLike(const ChunkedArray& data, PyObject** out_values) { - PyAcquireGIL lock; - for (int c = 0; c < data.num_chunks(); c++) { - auto arr = static_cast<ArrayType*>(data.chunk(c).get()); + Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, + int64_t rel_placement) override { + using T = typename arrow_traits<ARROW_INDEX_TYPE>::T; - const uint8_t* data_ptr; - int32_t length; - const bool has_nulls = data.null_count() > 0; - for (int64_t i = 0; i < arr->length(); ++i) { - if (has_nulls && arr->IsNull(i)) { - Py_INCREF(Py_None); - *out_values = Py_None; - } else { - data_ptr = arr->GetValue(i, &length); - *out_values = WrapBytes<ArrayType>::Wrap(data_ptr, length); - if (*out_values == nullptr) { - return Status::UnknownError("String initialization failed"); - } + T* out_values = reinterpret_cast<T*>(block_data_) + rel_placement * num_rows_; + + const ChunkedArray& data = *col->data().get(); + + for (int c = 0; c < data.num_chunks(); c++) { + const std::shared_ptr<Array> arr = data.chunk(c); + const auto& dict_arr = static_cast<const arrow::DictionaryArray&>(*arr); + const auto& indices = + static_cast<const arrow::PrimitiveArray&>(*dict_arr.indices()); + auto in_values = reinterpret_cast<const T*>(indices.data()->data()); + + // Null is -1 in CategoricalBlock + for (int i = 0; i < arr->length(); ++i) { + *out_values++ = indices.IsNull(i) ? -1 : in_values[i]; } - ++out_values; } - } - return Status::OK(); -} -template <typename ArrowType> -inline Status ConvertListsLike( - const std::shared_ptr<Column>& col, PyObject** out_values) { - typedef arrow_traits<ArrowType::type_id> traits; - typedef typename ::arrow::TypeTraits<ArrowType>::ArrayType ArrayType; + placement_data_[rel_placement] = abs_placement; - const ChunkedArray& data = *col->data().get(); - auto list_type = std::static_pointer_cast<ListType>(col->type()); + auto dict_type = static_cast<const DictionaryType*>(col->type().get()); - // Get column of underlying value arrays - std::vector<std::shared_ptr<Array>> value_arrays; - for (int c = 0; c < data.num_chunks(); c++) { - auto arr = std::static_pointer_cast<arrow::ListArray>(data.chunk(c)); - value_arrays.emplace_back(arr->values()); + PyObject* dict; + RETURN_NOT_OK(ConvertArrayToPandas(dict_type->dictionary(), nullptr, &dict)); + dictionary_.reset(dict); + + return Status::OK(); } - auto flat_column = std::make_shared<Column>(list_type->value_field(), value_arrays); - // TODO(ARROW-489): Currently we don't have a Python reference for single columns. - // Storing a reference to the whole Array would be to expensive. - PyObject* numpy_array; - RETURN_NOT_OK(ConvertColumnToPandas(flat_column, nullptr, &numpy_array)); - PyAcquireGIL lock; + Status GetPyResult(PyObject** output) override { + PyObject* result = PyDict_New(); + RETURN_IF_PYERROR(); - for (int c = 0; c < data.num_chunks(); c++) { - auto arr = std::static_pointer_cast<arrow::ListArray>(data.chunk(c)); + PyDict_SetItemString(result, "block", block_arr_.obj()); + PyDict_SetItemString(result, "dictionary", dictionary_.obj()); + PyDict_SetItemString(result, "placement", placement_arr_.obj()); - const uint8_t* data_ptr; - int32_t length; - const bool has_nulls = data.null_count() > 0; - for (int64_t i = 0; i < arr->length(); ++i) { - if (has_nulls && arr->IsNull(i)) { - Py_INCREF(Py_None); - *out_values = Py_None; - } else { - PyObject* start = PyLong_FromLong(arr->value_offset(i)); - PyObject* end = PyLong_FromLong(arr->value_offset(i + 1)); - PyObject* slice = PySlice_New(start, end, NULL); - *out_values = PyObject_GetItem(numpy_array, slice); - Py_DECREF(start); - Py_DECREF(end); - Py_DECREF(slice); - } - ++out_values; - } - } + *output = result; - Py_XDECREF(numpy_array); - return Status::OK(); -} + return Status::OK(); + } -template <typename T> -inline void ConvertNumericNullable(const ChunkedArray& data, T na_value, T* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + protected: + OwnedRef dictionary_; +}; - const uint8_t* valid_bits = arr->null_bitmap_data(); +Status MakeBlock(PandasBlock::type type, int64_t num_rows, int num_columns, + std::shared_ptr<PandasBlock>* block) { +#define BLOCK_CASE(NAME, TYPE) \ + case PandasBlock::NAME: \ + *block = std::make_shared<TYPE>(num_rows, num_columns); \ + break; - if (arr->null_count() > 0) { - for (int64_t i = 0; i < arr->length(); ++i) { - *out_values++ = BitUtil::BitNotSet(valid_bits, i) ? na_value : in_values[i]; - } - } else { - memcpy(out_values, in_values, sizeof(T) * arr->length()); - out_values += arr->length(); - } + switch (type) { + BLOCK_CASE(OBJECT, ObjectBlock); + BLOCK_CASE(UINT8, UInt8Block); + BLOCK_CASE(INT8, Int8Block); + BLOCK_CASE(UINT16, UInt16Block); + BLOCK_CASE(INT16, Int16Block); + BLOCK_CASE(UINT32, UInt32Block); + BLOCK_CASE(INT32, Int32Block); + BLOCK_CASE(UINT64, UInt64Block); + BLOCK_CASE(INT64, Int64Block); + BLOCK_CASE(FLOAT, Float32Block); + BLOCK_CASE(DOUBLE, Float64Block); + BLOCK_CASE(BOOL, BoolBlock); + BLOCK_CASE(DATETIME, DatetimeBlock); + default: + return Status::NotImplemented("Unsupported block type"); } -} -template <typename InType, typename OutType> -inline void ConvertNumericNullableCast( - const ChunkedArray& data, OutType na_value, OutType* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); +#undef BLOCK_CASE - for (int64_t i = 0; i < arr->length(); ++i) { - *out_values++ = arr->IsNull(i) ? na_value : static_cast<OutType>(in_values[i]); - } - } + return (*block)->Allocate(); } -template <typename T> -inline void ConvertDates(const ChunkedArray& data, T na_value, T* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); - - for (int64_t i = 0; i < arr->length(); ++i) { - // There are 1000 * 60 * 60 * 24 = 86400000ms in a day - *out_values++ = arr->IsNull(i) ? na_value : in_values[i] / 86400000; - } +static inline bool ListTypeSupported(const Type::type type_id) { + switch (type_id) { + case Type::UINT8: + case Type::INT8: + case Type::UINT16: + case Type::INT16: + case Type::UINT32: + case Type::INT32: + case Type::INT64: + case Type::UINT64: + case Type::FLOAT: + case Type::DOUBLE: + case Type::STRING: + case Type::TIMESTAMP: + // The above types are all supported. + return true; + default: + break; } + return false; } -template <typename InType, int SHIFT> -inline void ConvertDatetimeNanos(const ChunkedArray& data, int64_t* out_values) { - for (int c = 0; c < data.num_chunks(); c++) { - const std::shared_ptr<Array> arr = data.chunk(c); - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const InType*>(prim_arr->data()->data()); - - for (int64_t i = 0; i < arr->length(); ++i) { - *out_values++ = arr->IsNull(i) ? kPandasTimestampNull - : (static_cast<int64_t>(in_values[i]) * SHIFT); +static inline Status MakeCategoricalBlock(const std::shared_ptr<DataType>& type, + int64_t num_rows, std::shared_ptr<PandasBlock>* block) { + // All categoricals become a block with a single column + auto dict_type = static_cast<const DictionaryType*>(type.get()); + switch (dict_type->index_type()->type) { + case Type::INT8: + *block = std::make_shared<CategoricalBlock<Type::INT8>>(num_rows); + break; + case Type::INT16: + *block = std::make_shared<CategoricalBlock<Type::INT16>>(num_rows); + break; + case Type::INT32: + *block = std::make_shared<CategoricalBlock<Type::INT32>>(num_rows); + break; + case Type::INT64: + *block = std::make_shared<CategoricalBlock<Type::INT64>>(num_rows); + break; + default: { + std::stringstream ss; + ss << "Categorical index type not implemented: " + << dict_type->index_type()->ToString(); + return Status::NotImplemented(ss.str()); } } + return (*block)->Allocate(); } -class ArrowDeserializer { +// Construct the exact pandas 0.x "BlockManager" memory layout +// +// * For each column determine the correct output pandas type +// * Allocate 2D blocks (ncols x nrows) for each distinct data type in output +// * Allocate block placement arrays +// * Write Arrow columns out into each slice of memory; populate block +// * placement arrays as we go +class DataFrameBlockCreator { public: - ArrowDeserializer(const std::shared_ptr<Column>& col, PyObject* py_ref) - : col_(col), data_(*col->data().get()), py_ref_(py_ref) {} - - Status AllocateOutput(int type) { - PyAcquireGIL lock; - - npy_intp dims[1] = {col_->length()}; - out_ = reinterpret_cast<PyArrayObject*>(PyArray_SimpleNew(1, dims, type)); + DataFrameBlockCreator(const std::shared_ptr<Table>& table) : table_(table) {} - if (out_ == NULL) { - // Error occurred, trust that SimpleNew set the error state - return Status::OK(); - } + Status Convert(int nthreads, PyObject** output) { + column_types_.resize(table_->num_columns()); + column_block_placement_.resize(table_->num_columns()); + type_counts_.clear(); + blocks_.clear(); - set_numpy_metadata(type, col_->type().get(), out_); + RETURN_NOT_OK(CreateBlocks()); + RETURN_NOT_OK(WriteTableToBlocks(nthreads)); - return Status::OK(); + return GetResultList(output); } - template <int TYPE> - Status ConvertValuesZeroCopy(int npy_type, std::shared_ptr<Array> arr) { - typedef typename arrow_traits<TYPE>::T T; - - auto prim_arr = static_cast<arrow::PrimitiveArray*>(arr.get()); - auto in_values = reinterpret_cast<const T*>(prim_arr->data()->data()); + Status CreateBlocks() { + for (int i = 0; i < table_->num_columns(); ++i) { + std::shared_ptr<Column> col = table_->column(i); + PandasBlock::type output_type; - // Zero-Copy. We can pass the data pointer directly to NumPy. - void* data = const_cast<T*>(in_values); + Type::type column_type = col->type()->type; + switch (column_type) { + case Type::BOOL: + output_type = col->null_count() > 0 ? PandasBlock::OBJECT : PandasBlock::BOOL; + break; + case Type::UINT8: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT8; + break; + case Type::INT8: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT8; + break; + case Type::UINT16: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT16; + break; + case Type::INT16: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT16; + break; + case Type::UINT32: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT32; + break; + case Type::INT32: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT32; + break; + case Type::INT64: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::INT64; + break; + case Type::UINT64: + output_type = col->null_count() > 0 ? PandasBlock::DOUBLE : PandasBlock::UINT64; + break; + case Type::FLOAT: + output_type = PandasBlock::FLOAT; + break; + case Type::DOUBLE: + output_type = PandasBlock::DOUBLE; + break; + case Type::STRING: + case Type::BINARY: + output_type = PandasBlock::OBJECT; + break; + case Type::DATE: + output_type = PandasBlock::DATETIME; + break; + case Type::TIMESTAMP: + output_type = PandasBlock::DATETIME; + break; + case Type::LIST: { + auto list_type = std::static_pointer_cast<ListType>(col->type()); + if (!ListTypeSupported(list_type->value_type()->type)) { + std::stringstream ss; + ss << "Not implemented type for lists: " + << list_type->value_type()->ToString(); + return Status::NotImplemented(ss.str()); + } + output_type = PandasBlock::OBJECT; + } break; + case Type::DICTIONARY: + output_type = PandasBlock::CATEGORICAL; + break; + default: + return Status::NotImplemented(col->type()->ToString()); + } - PyAcquireGIL lock; + int block_placement = 0; + if (column_type == Type::DICTIONARY) { + std::shared_ptr<PandasBlock> block; + RETURN_NOT_OK(MakeCategoricalBlock(col->type(), table_->num_rows(), &block)); + categorical_blocks_[i] = block; + } else { + auto it = type_counts_.find(output_type); + if (it != type_counts_.end()) { + block_placement = it->second; + // Increment count + it->second += 1; + } else { + // Add key to map + type_counts_[output_type] = 1; + } + } - // Zero-Copy. We can pass the data pointer directly to NumPy. - npy_intp dims[1] = {col_->length()}; - out_ = reinterpret_cast<PyArrayObject*>( - PyArray_SimpleNewFromData(1, dims, npy_type, data)); + column_types_[i] = output_type; + column_block_placement_[i] = block_placement; + } - if (out_ == NULL) { - // Error occurred, trust that SimpleNew set the error state - return Status::OK(); + // Create normal non-categorical blocks + for (const auto& it : type_counts_) { + PandasBlock::type type = static_cast<PandasBlock::type>(it.first); + std::shared_ptr<PandasBlock> block; + RETURN_NOT_OK(MakeBlock(type, table_->num_rows(), it.second, &block)); + blocks_[type] = block; } + return Status::OK(); + } - set_numpy_metadata(npy_type, col_->type().get(), out_); + Status WriteTableToBlocks(int nthreads) { + auto WriteColumn = [this](int i) { + std::shared_ptr<Column> col = this->table_->column(i); + PandasBlock::type output_type = this->column_types_[i]; - if (PyArray_SetBaseObject(out_, py_ref_) == -1) { - // Error occurred, trust that SetBaseObject set the error state - return Status::OK(); - } else { - // PyArray_SetBaseObject steals our reference to py_ref_ - Py_INCREF(py_ref_); - } + int rel_placement = this->column_block_placement_[i]; - // Arrow data is immutable. - PyArray_CLEARFLAGS(out_, NPY_ARRAY_WRITEABLE); + std::shared_ptr<PandasBlock> block; + if (output_type == PandasBlock::CATEGORICAL) { + auto it = this->categorical_blocks_.find(i); + if (it == this->blocks_.end()) { + return Status::KeyError("No categorical block allocated"); + } + block = it->second; + } else { + auto it = this->blocks_.find(output_type); + if (it == this->blocks_.end()) { return Status::KeyError("No block allocated"); } + block = it->second; + } + return block->Write(col, i, rel_placement); + }; - return Status::OK(); - } + nthreads = std::min<int>(nthreads, table_->num_columns()); - // ---------------------------------------------------------------------- - // Allocate new array and deserialize. Can do a zero copy conversion for some - // types + if (nthreads == 1) { + for (int i = 0; i < table_->num_columns(); ++i) { + RETURN_NOT_OK(WriteColumn(i)); + } + } else { + std::vector<std::thread> thread_pool; + thread_pool.reserve(nthreads); + std::atomic<int> task_counter(0); - Status Convert(PyObject** out) { -#define CONVERT_CASE(TYPE) \ - case arrow::Type::TYPE: { \ - RETURN_NOT_OK(ConvertValues<arrow::Type::TYPE>()); \ - } break; + std::mutex error_mtx; + bool error_occurred = false; + Status error; - switch (col_->type()->type) { - CONVERT_CASE(BOOL); - CONVERT_CASE(INT8); - CONVERT_CASE(INT16); - CONVERT_CASE(INT32); - CONVERT_CASE(INT64); - CONVERT_CASE(UINT8); - CONVERT_CASE(UINT16); - CONVERT_CASE(UINT32); - CONVERT_CASE(UINT64); - CONVERT_CASE(FLOAT); - CONVERT_CASE(DOUBLE); - CONVERT_CASE(BINARY); - CONVERT_CASE(STRING); - CONVERT_CASE(DATE); - CONVERT_CASE(TIMESTAMP); - default: { - std::stringstream ss; - ss << "Arrow type reading not implemented for " << col_->type()->ToString(); - return Status::NotImplemented(ss.str()); + for (int thread_id = 0; thread_id < nthreads; ++thread_id) { + thread_pool.emplace_back( + [this, &error, &error_occurred, &error_mtx, &task_counter, &WriteColumn]() { + int column_num; + while (!error_occurred) { + column_num = task_counter.fetch_add(1); + if (column_num >= this->table_->num_columns()) { break; } + Status s = WriteColumn(column_num); + if (!s.ok()) { + std::lock_guard<std::mutex> lock(error_mtx); + error_occurred = true; + error = s; + break; + } + } + }); + } + for (auto&& thread : thread_pool) { + thread.join(); } - } - -#undef CONVERT_CASE - *out = reinterpret_cast<PyObject*>(out_); + if (error_occurred) { return error; } + } return Status::OK(); } - template <int TYPE> - inline typename std::enable_if< - (TYPE != arrow::Type::DATE) & arrow_traits<TYPE>::is_numeric_nullable, Status>::type - ConvertValues() { - typedef typename arrow_traits<TYPE>::T T; - int npy_type = arrow_traits<TYPE>::npy_type; + Status GetResultList(PyObject** out) { + PyAcquireGIL lock; - if (data_.num_chunks() == 1 && data_.null_count() == 0 && py_ref_ != nullptr) { - return ConvertValuesZeroCopy<TYPE>(npy_type, data_.chunk(0)); + auto num_blocks = + static_cast<Py_ssize_t>(blocks_.size() + categorical_blocks_.size()); + PyObject* result = PyList_New(num_blocks); + RETURN_IF_PYERROR(); + + int i = 0; + for (const auto& it : blocks_) { + const std::shared_ptr<PandasBlock> block = it.second; + PyObject* item; + RETURN_NOT_OK(block->GetPyResult(&item)); + if (PyList_SET_ITEM(result, i++, item) < 0) { RETURN_IF_PYERROR(); } } - RETURN_NOT_OK(AllocateOutput(npy_type)); - auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_)); - ConvertNumericNullable<T>(data_, arrow_traits<TYPE>::na_value, out_values); + for (const auto& it : categorical_blocks_) { + const std::shared_ptr<PandasBlock> block = it.second; + PyObject* item; + RETURN_NOT_OK(block->GetPyResult(&item)); + if (PyList_SET_ITEM(result, i++, item) < 0) { RETURN_IF_PYERROR(); } + } + *out = result; return Status::OK(); } - template <int TYPE> - inline typename std::enable_if<TYPE == arrow::Type::DATE, Status>::type - ConvertValues() { - typedef typename arrow_traits<TYPE>::T T; + private: + std::shared_ptr<Table> table_; - RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type)); - auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_)); - ConvertDates<T>(data_, arrow_traits<TYPE>::na_value, out_values); - return Status::OK(); - } + // column num -> block type id + std::vector<PandasBlock::type> column_types_; - // Integer specialization - template <int TYPE> - inline - typename std::enable_if<arrow_traits<TYPE>::is_numeric_not_nullable, Status>::type - ConvertValues() { - typedef typename arrow_traits<TYPE>::T T; - int npy_type = arrow_traits<TYPE>::npy_type; + // column num -> relative placement within internal block + std::vector<int> column_block_placement_; - if (data_.num_chunks() == 1 && data_.null_count() == 0 && py_ref_ != nullptr) { - return ConvertValuesZeroCopy<TYPE>(npy_type, data_.chunk(0)); - } + // block type -> type count + std::unordered_map<int, int> type_counts_; - if (data_.null_count() > 0) { - RETURN_NOT_OK(AllocateOutput(NPY_FLOAT64)); - auto out_values = reinterpret_cast<double*>(PyArray_DATA(out_)); - ConvertIntegerWithNulls<T>(data_, out_values); - } else { - RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type)); - auto out_values = reinterpret_cast<T*>(PyArray_DATA(out_)); - ConvertIntegerNoNullsSameType<T>(data_, out_values); - } + // block type -> block + std::unordered_map<int, std::shared_ptr<PandasBlock>> blocks_; - return Status::OK(); - } + // column number -> categorical block + std::unordered_map<int, std::shared_ptr<PandasBlock>> categorical_blocks_; +}; - // Boolean specialization - template <int TYPE> - inline typename std::enable_if<arrow_traits<TYPE>::is_boolean, Status>::type - ConvertValues() { - if (data_.null_count() > 0) { - RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); - auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)); - RETURN_NOT_OK(ConvertBooleanWithNulls(data_, out_values)); - } else { - RETURN_NOT_OK(AllocateOutput(arrow_traits<TYPE>::npy_type)); - auto out_values = reinterpret_cast<uint8_t*>(PyArray_DATA(out_)); - ConvertBooleanNoNulls(data_, out_values); - } - return Status::OK(); +Status ConvertTableToPandas( + const std::shared_ptr<Table>& table, int nthreads, PyObject** out) { + DataFrameBlockCreator helper(table); + return helper.Convert(nthreads, out); +} + +// ---------------------------------------------------------------------- +// Serialization + +template <int TYPE> +class ArrowSerializer { + public: + ArrowSerializer(arrow::MemoryPool* pool, PyArrayObject* arr, PyArrayObject* mask) + : pool_(pool), arr_(arr), mask_(mask) { + length_ = PyArray_SIZE(arr_); } - // UTF8 strings - template <int TYPE> - inline typename std::enable_if<TYPE == arrow::Type::STRING, Status>::type - ConvertValues() { - RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); - auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)); - return ConvertBinaryLike<arrow::StringArray>(data_, out_values); + void IndicateType(const std::shared_ptr<Field> field) { field_indicator_ = field; } + + Status Convert(std::shared_ptr<Array>* out); + + int stride() const { return PyArray_STRIDES(arr_)[0]; } + + Status InitNullBitmap() { + int null_bytes = BitUtil::BytesForBits(length_); + + null_bitmap_ = std::make_shared<arrow::PoolBuffer>(pool_); + RETURN_NOT_OK(null_bitmap_->Resize(null_bytes)); + + null_bitmap_data_ = null_bitmap_->mutable_data(); + memset(null_bitmap_data_, 0, null_bytes); + + return Status::OK(); } - template <int T2> - inline typename std::enable_if<T2 == arrow::Type::BINARY, Status>::type - ConvertValues() { - RETURN_NOT_OK(AllocateOutput(NPY_OBJECT)); - auto out_values = reinterpret_cast<PyObject**>(PyArray_DATA(out_)); - return ConvertBinaryLike<arrow::BinaryArray>(data_, out_values); + bool is_strided() const { + npy_intp* astrides = PyArray_STRIDES(arr_); + return astrides[0] != PyArray_DESCR(arr_)->elsize; } private: - std::shared_ptr<Column> col_; - const arrow::ChunkedArray& data_; - PyObject* py_ref_; - PyArrayObject* out_; -}; + Status ConvertData(); -Status ConvertArrayToPandas( - const std::shared_ptr<Array>& arr, PyObject* py_ref, PyObject** out) { - static std::string dummy_name = "dummy"; - auto field = std::make_shared<Field>(dummy_name, arr->type()); - auto col = std::make_shared<Column>(field, arr); - return ConvertColumnToPandas(col, py_ref, out); -} + Status ConvertDates(std::shared_ptr<Array>* out) { + PyAcquireGIL lock; -Status ConvertColumnToPandas( - const std::shared_ptr<Column>& col, PyObject* py_ref, PyObject** out) { - ArrowDeserializer converter(col, py_ref); - return converter.Convert(out); -} + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + arrow::TypePtr string_type(new arrow::DateType()); + arrow::DateBuilder date_builder(pool_, string_type); + RETURN_NOT_OK(date_builder.Resize(length_)); -// ---------------------------------------------------------------------- -// pandas 0.x DataFrame conversion internals + Status s; + PyObject* obj; + for (int64_t i = 0; i < length_; ++i) { + obj = objects[i]; + if (PyDate_CheckExact(obj)) { + PyDateTime_Date* pydate = reinterpret_cast<PyDateTime_Date*>(obj); + date_builder.Append(PyDate_to_ms(pydate)); + } else { + date_builder.AppendNull(); + } + } + return date_builder.Finish(out); + } -class PandasBlock { - public: - enum type { - OBJECT, - UINT8, - INT8, - UINT16, - INT16, - UINT32, - INT32, - UINT64, - INT64, - FLOAT, - DOUBLE, - BOOL, - DATETIME, - CATEGORICAL - }; + Status ConvertObjectStrings(std::shared_ptr<Array>* out) { + PyAcquireGIL lock; - PandasBlock(int64_t num_rows, int num_columns) - : num_rows_(num_rows), num_columns_(num_columns) {} - virtual ~PandasBlock() {} + // The output type at this point is inconclusive because there may be bytes + // and unicode mixed in the object array - virtual Status Allocate() = 0; - virtual Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, - int64_t rel_placement) = 0; + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + arrow::TypePtr string_type(new arrow::StringType()); + arrow::StringBuilder string_builder(pool_, string_type); + RETURN_NOT_OK(string_builder.Resize(length_)); - PyObject* block_arr() { return block_arr_.obj(); } + Status s; + bool have_bytes = false; + RETURN_NOT_OK(AppendObjectStrings(string_builder, objects, length_, &have_bytes)); + RETURN_NOT_OK(string_builder.Finish(out)); - PyObject* placement_arr() { return placement_arr_.obj(); } + if (have_bytes) { + const auto& arr = static_cast<const arrow::StringArray&>(*out->get()); + *out = std::make_shared<arrow::BinaryArray>( + arr.length(), arr.offsets(), arr.data(), arr.null_count(), arr.null_bitmap()); + } + return Status::OK(); + } - protected: - Status AllocateNDArray(int npy_type) { + Status ConvertBooleans(std::shared_ptr<Array>* out) { PyAcquireGIL lock; - npy_intp block_dims[2] = {num_columns_, num_rows_}; - PyObject* block_arr = PyArray_SimpleNew(2, block_dims, npy_type); - if (block_arr == NULL) { - // TODO(wesm): propagating Python exception - return Status::OK(); + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + + int nbytes = BitUtil::BytesForBits(length_); + auto data = std::make_shared<arrow::PoolBuffer>(pool_); + RETURN_NOT_OK(data->Resize(nbytes)); + uint8_t* bitmap = data->mutable_data(); + memset(bitmap, 0, nbytes); + + int64_t null_count = 0; + for (int64_t i = 0; i < length_; ++i) { + if (objects[i] == Py_True) { + BitUtil::SetBit(bitmap, i); + BitUtil::SetBit(null_bitmap_data_, i); + } else if (objects[i] != Py_False) { + ++null_count; + } else { + BitUtil::SetBit(null_bitmap_data_, i); + } } - npy_intp placement_dims[1] = {num_columns_}; - PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT64); - if (placement_arr == NULL) { - // TODO(wesm): propagating Python exception - return Status::OK(); + *out = std::make_shared<arrow::BooleanArray>(length_, data, null_count, null_bitmap_); + + return Status::OK(); + } + + template <int ITEM_TYPE, typename ArrowType> + Status ConvertTypedLists( + const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out); + +#define LIST_CASE(TYPE, NUMPY_TYPE, ArrowType) \ + case Type::TYPE: { \ + return ConvertTypedLists<NUMPY_TYPE, ::arrow::ArrowType>(field, out); \ + } + + Status ConvertLists(const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) { + switch (field->type->type) { + LIST_CASE(UINT8, NPY_UINT8, UInt8Type) + LIST_CASE(INT8, NPY_INT8, Int8Type) + LIST_CASE(UINT16, NPY_UINT16, UInt16Type) + LIST_CASE(INT16, NPY_INT16, Int16Type) + LIST_CASE(UINT32, NPY_UINT32, UInt32Type) + LIST_CASE(INT32, NPY_INT32, Int32Type) + LIST_CASE(UINT64, NPY_UINT64, UInt64Type) + LIST_CASE(INT64, NPY_INT64, Int64Type) + LIST_CASE(TIMESTAMP, NPY_DATETIME, TimestampType) + LIST_CASE(FLOAT, NPY_FLOAT, FloatType) + LIST_CASE(DOUBLE, NPY_DOUBLE, DoubleType) + LIST_CASE(STRING, NPY_OBJECT, StringType) + default: + return Status::TypeError("Unknown list item type"); } - block_arr_.reset(block_arr); - placement_arr_.reset(placement_arr); - - block_data_ = reinterpret_cast<uint8_t*>( - PyArray_DATA(reinterpret_cast<PyArrayObject*>(block_arr))); - - placement_data_ = reinterpret_cast<int64_t*>( - PyArray_DATA(reinterpret_cast<PyArrayObject*>(placement_arr))); - - return Status::OK(); + return Status::TypeError("Unknown list type"); } - int64_t num_rows_; - int num_columns_; - - OwnedRef block_arr_; - uint8_t* block_data_; + Status MakeDataType(std::shared_ptr<DataType>* out); - // ndarray<int32> - OwnedRef placement_arr_; - int64_t* placement_data_; + arrow::MemoryPool* pool_; - DISALLOW_COPY_AND_ASSIGN(PandasBlock); -}; + PyArrayObject* arr_; + PyArrayObject* mask_; -#define CONVERTLISTSLIKE_CASE(ArrowType, ArrowEnum) \ - case Type::ArrowEnum: \ - RETURN_NOT_OK((ConvertListsLike<::arrow::ArrowType>(col, out_buffer))); \ - break; + int64_t length_; -class ObjectBlock : public PandasBlock { - public: - using PandasBlock::PandasBlock; - virtual ~ObjectBlock() {} + std::shared_ptr<Field> field_indicator_; + std::shared_ptr<arrow::Buffer> data_; + std::shared_ptr<arrow::ResizableBuffer> null_bitmap_; + uint8_t* null_bitmap_data_; +}; - Status Allocate() override { return AllocateNDArray(NPY_OBJECT); } +// Returns null count +static int64_t MaskToBitmap(PyArrayObject* mask, int64_t length, uint8_t* bitmap) { + int64_t null_count = 0; + const uint8_t* mask_values = static_cast<const uint8_t*>(PyArray_DATA(mask)); + // TODO(wesm): strided null mask + for (int i = 0; i < length; ++i) { + if (mask_values[i]) { + ++null_count; + } else { + BitUtil::SetBit(bitmap, i); + } + } + return null_count; +} - Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, - int64_t rel_placement) override { - Type::type type = col->type()->type; +template <int TYPE> +inline Status ArrowSerializer<TYPE>::MakeDataType(std::shared_ptr<DataType>* out) { + out->reset(new typename npy_traits<TYPE>::TypeClass()); + return Status::OK(); +} - PyObject** out_buffer = - reinterpret_cast<PyObject**>(block_data_) + rel_placement * num_rows_; +template <> +inline Status ArrowSerializer<NPY_DATETIME>::MakeDataType( + std::shared_ptr<DataType>* out) { + PyArray_Descr* descr = PyArray_DESCR(arr_); + auto date_dtype = reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(descr->c_metadata); + arrow::TimestampType::Unit unit; - const ChunkedArray& data = *col->data().get(); + switch (date_dtype->meta.base) { + case NPY_FR_s: + unit = arrow::TimestampType::Unit::SECOND; + break; + case NPY_FR_ms: + unit = arrow::TimestampType::Unit::MILLI; + break; + case NPY_FR_us: + unit = arrow::TimestampType::Unit::MICRO; + break; + case NPY_FR_ns: + unit = arrow::TimestampType::Unit::NANO; + break; + default: + return Status::Invalid("Unknown NumPy datetime unit"); + } - if (type == Type::BOOL) { - RETURN_NOT_OK(ConvertBooleanWithNulls(data, out_buffer)); - } else if (type == Type::BINARY) { - RETURN_NOT_OK(ConvertBinaryLike<arrow::BinaryArray>(data, out_buffer)); - } else if (type == Type::STRING) { - RETURN_NOT_OK(ConvertBinaryLike<arrow::StringArray>(data, out_buffer)); - } else if (type == Type::LIST) { - auto list_type = std::static_pointer_cast<ListType>(col->type()); - switch (list_type->value_type()->type) { - CONVERTLISTSLIKE_CASE(UInt8Type, UINT8) - CONVERTLISTSLIKE_CASE(Int8Type, INT8) - CONVERTLISTSLIKE_CASE(UInt16Type, UINT16) - CONVERTLISTSLIKE_CASE(Int16Type, INT16) - CONVERTLISTSLIKE_CASE(UInt32Type, UINT32) - CONVERTLISTSLIKE_CASE(Int32Type, INT32) - CONVERTLISTSLIKE_CASE(UInt64Type, UINT64) - CONVERTLISTSLIKE_CASE(Int64Type, INT64) - CONVERTLISTSLIKE_CASE(TimestampType, TIMESTAMP) - CONVERTLISTSLIKE_CASE(FloatType, FLOAT) - CONVERTLISTSLIKE_CASE(DoubleType, DOUBLE) - CONVERTLISTSLIKE_CASE(StringType, STRING) - default: { - std::stringstream ss; - ss << "Not implemented type for lists: " << list_type->value_type()->ToString(); - return Status::NotImplemented(ss.str()); - } - } - } else { - std::stringstream ss; - ss << "Unsupported type for object array output: " << col->type()->ToString(); - return Status::NotImplemented(ss.str()); - } + out->reset(new arrow::TimestampType(unit)); + return Status::OK(); +} - placement_data_[rel_placement] = abs_placement; - return Status::OK(); - } -}; +template <int TYPE> +inline Status ArrowSerializer<TYPE>::Convert(std::shared_ptr<Array>* out) { + typedef npy_traits<TYPE> traits; -template <int ARROW_TYPE, typename C_TYPE> -class IntBlock : public PandasBlock { - public: - using PandasBlock::PandasBlock; + if (mask_ != nullptr || traits::supports_nulls) { RETURN_NOT_OK(InitNullBitmap()); } - Status Allocate() override { - return AllocateNDArray(arrow_traits<ARROW_TYPE>::npy_type); + int64_t null_count = 0; + if (mask_ != nullptr) { + null_count = MaskToBitmap(mask_, length_, null_bitmap_data_); + } else if (traits::supports_nulls) { + null_count = ValuesToBitmap<TYPE>(PyArray_DATA(arr_), length_, null_bitmap_data_); } - Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, - int64_t rel_placement) override { - Type::type type = col->type()->type; - - C_TYPE* out_buffer = - reinterpret_cast<C_TYPE*>(block_data_) + rel_placement * num_rows_; + RETURN_NOT_OK(ConvertData()); + std::shared_ptr<DataType> type; + RETURN_NOT_OK(MakeDataType(&type)); + RETURN_NOT_OK(MakePrimitiveArray(type, length_, data_, null_count, null_bitmap_, out)); + return Status::OK(); +} - const ChunkedArray& data = *col->data().get(); +template <> +inline Status ArrowSerializer<NPY_OBJECT>::Convert(std::shared_ptr<Array>* out) { + // Python object arrays are annoying, since we could have one of: + // + // * Strings + // * Booleans with nulls + // * Mixed type (not supported at the moment by arrow format) + // + // Additionally, nulls may be encoded either as np.nan or None. So we have to + // do some type inference and conversion - if (type != ARROW_TYPE) { return Status::NotImplemented(col->type()->ToString()); } + RETURN_NOT_OK(InitNullBitmap()); - ConvertIntegerNoNullsSameType<C_TYPE>(data, out_buffer); - placement_data_[rel_placement] = abs_placement; - return Status::OK(); + // TODO: mask not supported here + const PyObject** objects = reinterpret_cast<const PyObject**>(PyArray_DATA(arr_)); + { + PyAcquireGIL lock; + PyDateTime_IMPORT; } -}; - -using UInt8Block = IntBlock<Type::UINT8, uint8_t>; -using Int8Block = IntBlock<Type::INT8, int8_t>; -using UInt16Block = IntBlock<Type::UINT16, uint16_t>; -using Int16Block = IntBlock<Type::INT16, int16_t>; -using UInt32Block = IntBlock<Type::UINT32, uint32_t>; -using Int32Block = IntBlock<Type::INT32, int32_t>; -using UInt64Block = IntBlock<Type::UINT64, uint64_t>; -using Int64Block = IntBlock<Type::INT64, int64_t>; -class Float32Block : public PandasBlock { - public: - using PandasBlock::PandasBlock; + if (field_indicator_) { + switch (field_indicator_->type->type) { + case Type::STRING: + return ConvertObjectStrings(out); + case Type::BOOL: + return ConvertBooleans(out); + case Type::DATE: + return ConvertDates(out); + case Type::LIST: { + auto list_field = static_cast<ListType*>(field_indicator_->type.get()); + return ConvertLists(list_field->value_field(), out); + } + default: + return Status::TypeError("No known conversion to Arrow type"); + } + } else { + for (int64_t i = 0; i < length_; ++i) { + if (PyObject_is_null(objects[i])) { + continue; + } else if (PyObject_is_string(objects[i])) { + return ConvertObjectStrings(out); + } else if (PyBool_Check(objects[i])) { + return ConvertBooleans(out); + } else if (PyDate_CheckExact(objects[i])) { + return ConvertDates(out); + } else { + return Status::TypeError("unhandled python type"); + } + } + } - Status Allocate() override { return AllocateNDArray(NPY_FLOAT32); } + return Status::TypeError("Unable to infer type of object array, were all null"); +} - Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, - int64_t rel_placement) override { - Type::type type = col->type()->type; +template <int TYPE> +inline Status ArrowSerializer<TYPE>::ConvertData() { + // TODO(wesm): strided arrays + if (is_strided()) { return Status::Invalid("no support for strided data yet"); } - if (type != Type::FLOAT) { return Status::NotImplemented(col->type()->ToString()); } + data_ = std::make_shared<NumPyBuffer>(arr_); + return Status::OK(); +} - float* out_buffer = reinterpret_cast<float*>(block_data_) + rel_placement * num_rows_; +template <> +inline Status ArrowSerializer<NPY_BOOL>::ConvertData() { + if (is_strided()) { return Status::Invalid("no support for strided data yet"); } - ConvertNumericNullable<float>(*col->data().get(), NAN, out_buffer); - placement_data_[rel_placement] = abs_placement; - return Status::OK(); - } -}; + int nbytes = BitUtil::BytesForBits(length_); + auto buffer = std::make_shared<arrow::PoolBuffer>(pool_); + RETURN_NOT_OK(buffer->Resize(nbytes)); -class Float64Block : public PandasBlock { - public: - using PandasBlock::PandasBlock; + const uint8_t* values = reinterpret_cast<const uint8_t*>(PyArray_DATA(arr_)); - Status Allocate() override { return AllocateNDArray(NPY_FLOAT64); } + uint8_t* bitmap = buffer->mutable_data(); - Status Write(const std::shared_ptr<Column>& col, int64_t abs_placement, - int64_t rel_placement) override { - Type::type type = col->type()->type; + memset(bitmap, 0, nbytes); + for (int i = 0; i < length_; ++i) { + if (values[i] > 0) { BitUtil::SetBit(bitmap, i); } + } - double* out_buffer = - reinterpret_cast<double*>(block_data_) + rel_placement * num_rows_; + data_ = buffer; - const ChunkedArray& data = *col->data().get(); + return Status::OK(); +} -#define INTEGER_CASE(IN_TYPE) \ - ConvertIntegerWithNulls<IN_TYPE>(data, out_buffer); \ - break; +template <int TYPE> +template <int ITEM_TYPE, typename ArrowType> +inline Status ArrowSerializer<TYPE>::ConvertTypedLists( + const std::shared_ptr<Field>& field, std::shared_ptr<Array>* out) { + typedef npy_traits<ITEM_TYPE> traits; + typedef typename traits::value_type T; + typedef typename traits::BuilderClass BuilderT; - switch (type) { - case Type::UINT8: - INTEGER_CASE(uint8_t); - case Type::INT8: - INTEGER_CASE(int8_t); - case Type::UINT16: - INTEGER_CASE(uint16_t); - case Type::INT16: - INTEGER_CASE(int16_t); - case Type::UINT32: - INTEGER_CASE(uint32_t); - case Type::INT32: - INTEGER_CASE(int32_t); - case Type::UINT64: - INTEGER_CASE(uint64_t); - case Type::INT64: - INTEGER_CASE(int64_t); - case Type::FLOAT: - ConvertNumericNullableCast<float, double>(data, NAN, out_buffer); - break; - case Type::DOUBLE: - ConvertNumericNullable<double>(data, NAN, out_buffer); - break; - default: - return Status::NotImplemented(col->type()->ToString()); - } + auto value_builder = std::make_shared<BuilderT>(pool_, field->type); + ListBuilder list_builder(pool_, value_builder); + PyObject** objects = reinterpret_cast<PyObject**>(PyArray_DATA(arr_)); + for (int64_t i = 0; i < length_; ++i) { + if (PyObject_is_null(objects[i])) { + RETURN_NOT_OK(list_builder.AppendNull()); + } else if (PyArray_Check(objects[i])) { + auto numpy_array = reinterpret_cast<PyArrayObject*>(objects[i]); + RETURN_NOT_OK(list_builder.Append(true)); -#undef INTEGER_CASE + // TODO(uwe): Support more complex numpy array structures + RETURN_NOT_OK(CheckFlatNumpyArray(numpy_array, ITEM_TYPE)); - placement_data_[rel_placement] = abs_placement; - return Status::OK(); + int32_t size = PyArray_DIM(numpy_array, 0); + auto data = reinterpret_cast<const T*>(PyArray_DATA(numpy_arra
<TRUNCATED>