Repository: arrow Updated Branches: refs/heads/master 11ebe9387 -> 5aca7b669
ARROW-1480: [Python] Improve performance of serializing sets Author: Philipp Moritz <[email protected]> Author: Wes McKinney <[email protected]> Closes #1060 from pcmoritz/serialize-sets and squashes the following commits: 86707aaa [Wes McKinney] Update RecordBatch::column docstring now that columns are being cached cb451aab [Wes McKinney] Incorporate code review comments. Add internal caching of boxed arrays to StructArray, UnionArray, RecordBatch 89f191b0 [Philipp Moritz] fix linting 3d335e5a [Philipp Moritz] fix c705e435 [Philipp Moritz] deserialization a59cb989 [Philipp Moritz] support serializing sets Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/5aca7b66 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/5aca7b66 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/5aca7b66 Branch: refs/heads/master Commit: 5aca7b669530b20121b8dda566ddc20ac1dadbeb Parents: 11ebe93 Author: Philipp Moritz <[email protected]> Authored: Fri Sep 8 18:31:59 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Fri Sep 8 18:31:59 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/array-test.cc | 2 + cpp/src/arrow/array.cc | 21 ++-- cpp/src/arrow/array.h | 7 ++ cpp/src/arrow/python/arrow_to_python.cc | 155 ++++++++++++++---------- cpp/src/arrow/python/python_to_arrow.cc | 55 ++++++--- cpp/src/arrow/table.cc | 26 ++-- cpp/src/arrow/table.h | 13 +- python/pyarrow/tests/test_serialization.py | 1 + 8 files changed, 183 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/array-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc index 5d9eb18..c92c23d 100644 --- a/cpp/src/arrow/array-test.cc +++ b/cpp/src/arrow/array-test.cc @@ -2460,6 +2460,8 @@ TEST(TestUnionArrayAdHoc, TestSliceEquals) { auto CheckUnion = [&size](std::shared_ptr<Array> array) { std::shared_ptr<Array> slice, slice2; slice = array->Slice(2); + ASSERT_EQ(size - 2, slice->length()); + slice2 = array->Slice(2); ASSERT_EQ(size - 2, slice->length()); http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/array.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc index 34f0868..2d37274 100644 --- a/cpp/src/arrow/array.cc +++ b/cpp/src/arrow/array.cc @@ -329,6 +329,7 @@ std::string DecimalArray::FormatValue(int64_t i) const { StructArray::StructArray(const std::shared_ptr<ArrayData>& data) { DCHECK_EQ(data->type->id(), Type::STRUCT); SetData(data); + boxed_fields_.resize(data->child_data.size()); } StructArray::StructArray(const std::shared_ptr<DataType>& type, int64_t length, @@ -341,12 +342,14 @@ StructArray::StructArray(const std::shared_ptr<DataType>& type, int64_t length, for (const auto& child : children) { data_->child_data.push_back(child->data()); } + boxed_fields_.resize(children.size()); } -std::shared_ptr<Array> StructArray::field(int pos) const { - std::shared_ptr<Array> result; - DCHECK(internal::MakeArray(data_->child_data[pos], &result).ok()); - return result; +std::shared_ptr<Array> StructArray::field(int i) const { + if (!boxed_fields_[i]) { + DCHECK(internal::MakeArray(data_->child_data[i], &boxed_fields_[i]).ok()); + } + return boxed_fields_[i]; } // ---------------------------------------------------------------------- @@ -362,6 +365,7 @@ void UnionArray::SetData(const std::shared_ptr<ArrayData>& data) { raw_value_offsets_ = value_offsets == nullptr ? nullptr : reinterpret_cast<const int32_t*>(value_offsets->data()); + boxed_fields_.resize(data->child_data.size()); } UnionArray::UnionArray(const std::shared_ptr<ArrayData>& data) { @@ -384,10 +388,11 @@ UnionArray::UnionArray(const std::shared_ptr<DataType>& type, int64_t length, SetData(internal_data); } -std::shared_ptr<Array> UnionArray::child(int pos) const { - std::shared_ptr<Array> result; - DCHECK(internal::MakeArray(data_->child_data[pos], &result).ok()); - return result; +std::shared_ptr<Array> UnionArray::child(int i) const { + if (!boxed_fields_[i]) { + DCHECK(internal::MakeArray(data_->child_data[i], &boxed_fields_[i]).ok()); + } + return boxed_fields_[i]; } // ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/array.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 3faff71..bfeedd2 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -555,6 +555,10 @@ class ARROW_EXPORT StructArray : public Array { // Return a shared pointer in case the requestor desires to share ownership // with this array. std::shared_ptr<Array> field(int pos) const; + + private: + // For caching boxed child data + mutable std::vector<std::shared_ptr<Array>> boxed_fields_; }; // ---------------------------------------------------------------------- @@ -592,6 +596,9 @@ class ARROW_EXPORT UnionArray : public Array { const type_id_t* raw_type_ids_; const int32_t* raw_value_offsets_; + + // For caching boxed child data + mutable std::vector<std::shared_ptr<Array>> boxed_fields_; }; // ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/python/arrow_to_python.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc index b127971..bc12ba7 100644 --- a/cpp/src/arrow/python/arrow_to_python.cc +++ b/cpp/src/arrow/python/arrow_to_python.cc @@ -37,26 +37,31 @@ namespace py { Status CallDeserializeCallback(PyObject* context, PyObject* value, PyObject** deserialized_object); -Status DeserializeTuple(PyObject* context, std::shared_ptr<Array> array, - int64_t start_idx, int64_t stop_idx, PyObject* base, +Status DeserializeTuple(PyObject* context, const Array& array, int64_t start_idx, + int64_t stop_idx, PyObject* base, const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** out); -Status DeserializeList(PyObject* context, std::shared_ptr<Array> array, int64_t start_idx, +Status DeserializeList(PyObject* context, const Array& array, int64_t start_idx, int64_t stop_idx, PyObject* base, const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** out); -Status DeserializeDict(PyObject* context, std::shared_ptr<Array> array, int64_t start_idx, +Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx, + int64_t stop_idx, PyObject* base, + const std::vector<std::shared_ptr<Tensor>>& tensors, + PyObject** out); + +Status DeserializeDict(PyObject* context, const Array& array, int64_t start_idx, int64_t stop_idx, PyObject* base, const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** out) { - auto data = std::dynamic_pointer_cast<StructArray>(array); + const auto& data = static_cast<const StructArray&>(array); ScopedRef keys, vals; ScopedRef result(PyDict_New()); - RETURN_NOT_OK(DeserializeList(context, data->field(0), start_idx, stop_idx, base, + RETURN_NOT_OK(DeserializeList(context, *data.field(0), start_idx, stop_idx, base, tensors, keys.ref())); - RETURN_NOT_OK(DeserializeList(context, data->field(1), start_idx, stop_idx, base, + RETURN_NOT_OK(DeserializeList(context, *data.field(1), start_idx, stop_idx, base, tensors, vals.ref())); for (int64_t i = start_idx; i < stop_idx; ++i) { // PyDict_SetItem behaves differently from PyList_SetItem and PyTuple_SetItem. @@ -75,11 +80,10 @@ Status DeserializeDict(PyObject* context, std::shared_ptr<Array> array, int64_t return Status::OK(); } -Status DeserializeArray(std::shared_ptr<Array> array, int64_t offset, PyObject* base, +Status DeserializeArray(const Array& array, int64_t offset, PyObject* base, const std::vector<std::shared_ptr<arrow::Tensor>>& tensors, PyObject** out) { - DCHECK(array); - int32_t index = std::static_pointer_cast<Int32Array>(array)->Value(offset); + int32_t index = static_cast<const Int32Array&>(array).Value(offset); RETURN_NOT_OK(py::TensorToNdarray(*tensors[index], base, out)); // Mark the array as immutable ScopedRef flags(PyObject_GetAttrString(*out, "flags")); @@ -90,54 +94,51 @@ Status DeserializeArray(std::shared_ptr<Array> array, int64_t offset, PyObject* return Status::OK(); } -Status GetValue(PyObject* context, std::shared_ptr<Array> arr, int64_t index, - int32_t type, PyObject* base, - const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** result) { - switch (arr->type()->id()) { +Status GetValue(PyObject* context, const Array& arr, int64_t index, int32_t type, + PyObject* base, const std::vector<std::shared_ptr<Tensor>>& tensors, + PyObject** result) { + switch (arr.type()->id()) { case Type::BOOL: - *result = - PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index)); + *result = PyBool_FromLong(static_cast<const BooleanArray&>(arr).Value(index)); return Status::OK(); case Type::INT64: - *result = - PyLong_FromSsize_t(std::static_pointer_cast<Int64Array>(arr)->Value(index)); + *result = PyLong_FromSsize_t(static_cast<const Int64Array&>(arr).Value(index)); return Status::OK(); case Type::BINARY: { int32_t nchars; - const uint8_t* str = - std::static_pointer_cast<BinaryArray>(arr)->GetValue(index, &nchars); + const uint8_t* str = static_cast<const BinaryArray&>(arr).GetValue(index, &nchars); *result = PyBytes_FromStringAndSize(reinterpret_cast<const char*>(str), nchars); return CheckPyError(); } case Type::STRING: { int32_t nchars; - const uint8_t* str = - std::static_pointer_cast<StringArray>(arr)->GetValue(index, &nchars); + const uint8_t* str = static_cast<const StringArray&>(arr).GetValue(index, &nchars); *result = PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(str), nchars); return CheckPyError(); } case Type::FLOAT: - *result = - PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index)); + *result = PyFloat_FromDouble(static_cast<const FloatArray&>(arr).Value(index)); return Status::OK(); case Type::DOUBLE: - *result = - PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index)); + *result = PyFloat_FromDouble(static_cast<const DoubleArray&>(arr).Value(index)); return Status::OK(); case Type::STRUCT: { - auto s = std::static_pointer_cast<StructArray>(arr); - auto l = std::static_pointer_cast<ListArray>(s->field(0)); - if (s->type()->child(0)->name() == "list") { - return DeserializeList(context, l->values(), l->value_offset(index), - l->value_offset(index + 1), base, tensors, result); - } else if (s->type()->child(0)->name() == "tuple") { - return DeserializeTuple(context, l->values(), l->value_offset(index), - l->value_offset(index + 1), base, tensors, result); - } else if (s->type()->child(0)->name() == "dict") { - return DeserializeDict(context, l->values(), l->value_offset(index), - l->value_offset(index + 1), base, tensors, result); + const auto& s = static_cast<const StructArray&>(arr); + const auto& l = static_cast<const ListArray&>(*s.field(0)); + if (s.type()->child(0)->name() == "list") { + return DeserializeList(context, *l.values(), l.value_offset(index), + l.value_offset(index + 1), base, tensors, result); + } else if (s.type()->child(0)->name() == "tuple") { + return DeserializeTuple(context, *l.values(), l.value_offset(index), + l.value_offset(index + 1), base, tensors, result); + } else if (s.type()->child(0)->name() == "dict") { + return DeserializeDict(context, *l.values(), l.value_offset(index), + l.value_offset(index + 1), base, tensors, result); + } else if (s.type()->child(0)->name() == "set") { + return DeserializeSet(context, *l.values(), l.value_offset(index), + l.value_offset(index + 1), base, tensors, result); } else { - DCHECK(false) << "unexpected StructArray type " << s->type()->child(0)->name(); + DCHECK(false) << "unexpected StructArray type " << s.type()->child(0)->name(); } } // We use an Int32Builder here to distinguish the tensor indices from @@ -151,42 +152,72 @@ Status GetValue(PyObject* context, std::shared_ptr<Array> arr, int64_t index, return Status::OK(); } -#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN) \ - auto data = std::dynamic_pointer_cast<UnionArray>(array); \ - int64_t size = array->length(); \ - ScopedRef result(CREATE_FN(stop_idx - start_idx)); \ - auto types = std::make_shared<Int8Array>(size, data->type_ids()); \ - auto offsets = std::make_shared<Int32Array>(size, data->value_offsets()); \ - for (int64_t i = start_idx; i < stop_idx; ++i) { \ - if (data->IsNull(i)) { \ - Py_INCREF(Py_None); \ - SET_ITEM_FN(result.get(), i - start_idx, Py_None); \ - } else { \ - int64_t offset = offsets->Value(i); \ - int8_t type = types->Value(i); \ - std::shared_ptr<Array> arr = data->child(type); \ - PyObject* value; \ - RETURN_NOT_OK(GetValue(context, arr, offset, type, base, tensors, &value)); \ - SET_ITEM_FN(result.get(), i - start_idx, value); \ - } \ - } \ - *out = result.release(); \ +#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN) \ + const auto& data = static_cast<const UnionArray&>(array); \ + int64_t size = array.length(); \ + ScopedRef result(CREATE_FN(stop_idx - start_idx)); \ + auto types = std::make_shared<Int8Array>(size, data.type_ids()); \ + auto offsets = std::make_shared<Int32Array>(size, data.value_offsets()); \ + for (int64_t i = start_idx; i < stop_idx; ++i) { \ + if (data.IsNull(i)) { \ + Py_INCREF(Py_None); \ + SET_ITEM_FN(result.get(), i - start_idx, Py_None); \ + } else { \ + int64_t offset = offsets->Value(i); \ + int8_t type = types->Value(i); \ + PyObject* value; \ + RETURN_NOT_OK( \ + GetValue(context, *data.child(type), offset, type, base, tensors, &value)); \ + SET_ITEM_FN(result.get(), i - start_idx, value); \ + } \ + } \ + *out = result.release(); \ return Status::OK() -Status DeserializeList(PyObject* context, std::shared_ptr<Array> array, int64_t start_idx, +Status DeserializeList(PyObject* context, const Array& array, int64_t start_idx, int64_t stop_idx, PyObject* base, const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** out) { DESERIALIZE_SEQUENCE(PyList_New, PyList_SET_ITEM); } -Status DeserializeTuple(PyObject* context, std::shared_ptr<Array> array, - int64_t start_idx, int64_t stop_idx, PyObject* base, +Status DeserializeTuple(PyObject* context, const Array& array, int64_t start_idx, + int64_t stop_idx, PyObject* base, const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** out) { DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SET_ITEM); } +Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx, + int64_t stop_idx, PyObject* base, + const std::vector<std::shared_ptr<Tensor>>& tensors, + PyObject** out) { + const auto& data = static_cast<const UnionArray&>(array); + int64_t size = array.length(); + ScopedRef result(PySet_New(nullptr)); + auto types = std::make_shared<Int8Array>(size, data.type_ids()); + auto offsets = std::make_shared<Int32Array>(size, data.value_offsets()); + for (int64_t i = start_idx; i < stop_idx; ++i) { + if (data.IsNull(i)) { + Py_INCREF(Py_None); + if (PySet_Add(result.get(), Py_None) < 0) { + RETURN_IF_PYERROR(); + } + } else { + int64_t offset = offsets->Value(i); + int8_t type = types->Value(i); + PyObject* value; + RETURN_NOT_OK( + GetValue(context, *data.child(type), offset, type, base, tensors, &value)); + if (PySet_Add(result.get(), value) < 0) { + RETURN_IF_PYERROR(); + } + } + } + *out = result.release(); + return Status::OK(); +} + Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) { int64_t offset; int64_t bytes_read; @@ -213,7 +244,7 @@ Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) Status DeserializeObject(PyObject* context, const SerializedPyObject& obj, PyObject* base, PyObject** out) { PyAcquireGIL lock; - return DeserializeList(context, obj.batch->column(0), 0, obj.batch->num_rows(), base, + return DeserializeList(context, *obj.batch->column(0), 0, obj.batch->num_rows(), base, obj.tensors, out); } http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/python/python_to_arrow.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index 65e5f6a..c57091f 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -62,7 +62,8 @@ class SequenceBuilder { tensor_indices_(::arrow::int32(), pool), list_offsets_({0}), tuple_offsets_({0}), - dict_offsets_({0}) {} + dict_offsets_({0}), + set_offsets_({0}) {} /// Appending a none to the sequence Status AppendNone() { @@ -163,6 +164,12 @@ class SequenceBuilder { return Status::OK(); } + Status AppendSet(Py_ssize_t size) { + RETURN_NOT_OK(Update(set_offsets_.size() - 1, &set_tag_)); + set_offsets_.push_back(set_offsets_.back() + static_cast<int32_t>(size)); + return Status::OK(); + } + template <typename BuilderType> Status AddElement(const int8_t tag, BuilderType* out) { if (tag != -1) { @@ -200,7 +207,7 @@ class SequenceBuilder { /// Finish building the sequence and return the result. /// Input arrays may be nullptr Status Finish(const Array* list_data, const Array* tuple_data, const Array* dict_data, - std::shared_ptr<Array>* out) { + const Array* set_data, std::shared_ptr<Array>* out) { fields_.resize(num_tags_); children_.resize(num_tags_); @@ -215,6 +222,7 @@ class SequenceBuilder { RETURN_NOT_OK(AddSubsequence(list_tag_, list_data, list_offsets_, "list")); RETURN_NOT_OK(AddSubsequence(tuple_tag_, tuple_data, tuple_offsets_, "tuple")); RETURN_NOT_OK(AddSubsequence(dict_tag_, dict_data, dict_offsets_, "dict")); + RETURN_NOT_OK(AddSubsequence(set_tag_, set_data, set_offsets_, "set")); auto type = ::arrow::union_(fields_, type_ids_, UnionMode::DENSE); out->reset(new UnionArray(type, types_.length(), children_, types_.data(), @@ -246,6 +254,7 @@ class SequenceBuilder { std::vector<int32_t> list_offsets_; std::vector<int32_t> tuple_offsets_; std::vector<int32_t> dict_offsets_; + std::vector<int32_t> set_offsets_; // Tags for members of the sequence. If they are set to -1 it means // they are not used and will not part be of the metadata when we call @@ -263,6 +272,7 @@ class SequenceBuilder { int8_t list_tag_ = -1; int8_t tuple_tag_ = -1; int8_t dict_tag_ = -1; + int8_t set_tag_ = -1; int8_t num_tags_ = 0; @@ -297,12 +307,14 @@ class DictBuilder { /// value list of the dictionary Status Finish(const Array* key_tuple_data, const Array* key_dict_data, const Array* val_list_data, const Array* val_tuple_data, - const Array* val_dict_data, std::shared_ptr<Array>* out) { - // lists and dicts can't be keys of dicts in Python, that is why for + const Array* val_dict_data, const Array* val_set_data, + std::shared_ptr<Array>* out) { + // lists and sets can't be keys of dicts in Python, that is why for // the keys we do not need to collect sublists std::shared_ptr<Array> keys, vals; - RETURN_NOT_OK(keys_.Finish(nullptr, key_tuple_data, key_dict_data, &keys)); - RETURN_NOT_OK(vals_.Finish(val_list_data, val_tuple_data, val_dict_data, &vals)); + RETURN_NOT_OK(keys_.Finish(nullptr, key_tuple_data, key_dict_data, nullptr, &keys)); + RETURN_NOT_OK( + vals_.Finish(val_list_data, val_tuple_data, val_dict_data, val_set_data, &vals)); auto keys_field = std::make_shared<Field>("keys", keys->type()); auto vals_field = std::make_shared<Field>("vals", vals->type()); auto type = std::make_shared<StructType>( @@ -411,7 +423,8 @@ Status AppendScalar(PyObject* obj, SequenceBuilder* builder) { Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, std::vector<PyObject*>* sublists, std::vector<PyObject*>* subtuples, - std::vector<PyObject*>* subdicts, std::vector<PyObject*>* tensors_out) { + std::vector<PyObject*>* subdicts, std::vector<PyObject*>* subsets, + std::vector<PyObject*>* tensors_out) { // The bool case must precede the int case (PyInt_Check passes for bools) if (PyBool_Check(elem)) { RETURN_NOT_OK(builder->AppendBool(elem == Py_True)); @@ -463,6 +476,9 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder, } else if (PyTuple_CheckExact(elem)) { RETURN_NOT_OK(builder->AppendTuple(PyTuple_Size(elem))); subtuples->push_back(elem); + } else if (PySet_Check(elem)) { + RETURN_NOT_OK(builder->AppendSet(PySet_Size(elem))); + subsets->push_back(elem); } else if (PyArray_IsScalar(elem, Generic)) { RETURN_NOT_OK(AppendScalar(elem, builder)); } else if (PyArray_Check(elem)) { @@ -522,14 +538,14 @@ Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences, "recursively."); } SequenceBuilder builder(nullptr); - std::vector<PyObject*> sublists, subtuples, subdicts; + std::vector<PyObject*> sublists, subtuples, subdicts, subsets; for (const auto& sequence : sequences) { ScopedRef iterator(PyObject_GetIter(sequence)); RETURN_IF_PYERROR(); ScopedRef item; while (item.reset(PyIter_Next(iterator.get())), item.get()) { RETURN_NOT_OK(Append(context, item.get(), &builder, &sublists, &subtuples, - &subdicts, tensors_out)); + &subdicts, &subsets, tensors_out)); } } std::shared_ptr<Array> list; @@ -547,7 +563,12 @@ Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences, RETURN_NOT_OK( SerializeDict(context, subdicts, recursion_depth + 1, &dict, tensors_out)); } - return builder.Finish(list.get(), tuple.get(), dict.get(), out); + std::shared_ptr<Array> set; + if (subsets.size() > 0) { + RETURN_NOT_OK( + SerializeSequences(context, subsets, recursion_depth + 1, &set, tensors_out)); + } + return builder.Finish(list.get(), tuple.get(), dict.get(), set.get(), out); } Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts, @@ -559,16 +580,17 @@ Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts, "This object exceeds the maximum recursion depth. It may contain itself " "recursively."); } - std::vector<PyObject*> key_tuples, key_dicts, val_lists, val_tuples, val_dicts, dummy; + std::vector<PyObject*> key_tuples, key_dicts, val_lists, val_tuples, val_dicts, + val_sets, dummy; for (const auto& dict : dicts) { PyObject *key, *value; Py_ssize_t pos = 0; while (PyDict_Next(dict, &pos, &key, &value)) { RETURN_NOT_OK(Append(context, key, &result.keys(), &dummy, &key_tuples, &key_dicts, - tensors_out)); + &dummy, tensors_out)); DCHECK_EQ(dummy.size(), 0); RETURN_NOT_OK(Append(context, value, &result.vals(), &val_lists, &val_tuples, - &val_dicts, tensors_out)); + &val_dicts, &val_sets, tensors_out)); } } std::shared_ptr<Array> key_tuples_arr; @@ -596,9 +618,14 @@ Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts, RETURN_NOT_OK(SerializeDict(context, val_dicts, recursion_depth + 1, &val_dict_arr, tensors_out)); } + std::shared_ptr<Array> val_set_arr; + if (val_sets.size() > 0) { + RETURN_NOT_OK(SerializeSequences(context, val_sets, recursion_depth + 1, &val_set_arr, + tensors_out)); + } RETURN_NOT_OK(result.Finish(key_tuples_arr.get(), key_dicts_arr.get(), val_list_arr.get(), val_tuples_arr.get(), - val_dict_arr.get(), out)); + val_dict_arr.get(), val_set_arr.get(), out)); // This block is used to decrement the reference counts of the results // returned by the serialization callback, which is called in SerializeArray, http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/table.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index ae48698..8c7c4e2 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -160,9 +160,15 @@ void AssertBatchValid(const RecordBatch& batch) { } } +RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows) + : schema_(schema), num_rows_(num_rows) { + boxed_columns_.resize(schema->num_fields()); +} + RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, const std::vector<std::shared_ptr<Array>>& columns) - : schema_(schema), num_rows_(num_rows), columns_(columns.size()) { + : RecordBatch(schema, num_rows) { + columns_.resize(columns.size()); for (size_t i = 0; i < columns.size(); ++i) { columns_[i] = columns[i]->data(); } @@ -170,7 +176,8 @@ RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, std::vector<std::shared_ptr<Array>>&& columns) - : schema_(schema), num_rows_(num_rows), columns_(columns.size()) { + : RecordBatch(schema, num_rows) { + columns_.resize(columns.size()); for (size_t i = 0; i < columns.size(); ++i) { columns_[i] = columns[i]->data(); } @@ -178,16 +185,21 @@ RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, std::vector<std::shared_ptr<internal::ArrayData>>&& columns) - : schema_(schema), num_rows_(num_rows), columns_(std::move(columns)) {} + : RecordBatch(schema, num_rows) { + columns_ = std::move(columns); +} RecordBatch::RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows, const std::vector<std::shared_ptr<internal::ArrayData>>& columns) - : schema_(schema), num_rows_(num_rows), columns_(columns) {} + : RecordBatch(schema, num_rows) { + columns_ = columns; +} std::shared_ptr<Array> RecordBatch::column(int i) const { - std::shared_ptr<Array> result; - DCHECK(MakeArray(columns_[i], &result).ok()); - return result; + if (!boxed_columns_[i]) { + DCHECK(internal::MakeArray(columns_[i], &boxed_columns_[i]).ok()); + } + return boxed_columns_[i]; } const std::string& RecordBatch::column_name(int i) const { http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/cpp/src/arrow/table.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 1145d11..da2722d 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -153,13 +153,9 @@ class ARROW_EXPORT RecordBatch { /// \return true if batches are equal std::shared_ptr<Schema> schema() const { return schema_; } - /// \brief Retrieve an array from the record batch (new object) + /// \brief Retrieve an array from the record batch /// \param[in] i field index, does not boundscheck - /// \return a new Array object - /// - /// \note This function returns a new object. If you intend to dereference - /// the pointer or access the internals, retain a reference to the - /// std::shared_ptr returned. + /// \return an Array object std::shared_ptr<Array> column(int i) const; std::shared_ptr<internal::ArrayData> column_data(int i) const { return columns_[i]; } @@ -197,9 +193,14 @@ class ARROW_EXPORT RecordBatch { Status Validate() const; private: + RecordBatch(const std::shared_ptr<Schema>& schema, int64_t num_rows); + std::shared_ptr<Schema> schema_; int64_t num_rows_; std::vector<std::shared_ptr<internal::ArrayData>> columns_; + + // Caching boxed array data + mutable std::vector<std::shared_ptr<Array>> boxed_columns_; }; /// \class Table http://git-wip-us.apache.org/repos/asf/arrow/blob/5aca7b66/python/pyarrow/tests/test_serialization.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_serialization.py b/python/pyarrow/tests/test_serialization.py index aca3848..7c8cace 100644 --- a/python/pyarrow/tests/test_serialization.py +++ b/python/pyarrow/tests/test_serialization.py @@ -90,6 +90,7 @@ PRIMITIVE_OBJECTS = [ [1, 2, 3, None], [(None,), 3, 1.0], ["h", "e", "l", "l", "o", None], (None, None), ("hello", None), (True, False), {True: "hello", False: "world"}, {"hello": "world", 1: 42, 2.5: 45}, + {"hello": set([2, 3]), "world": set([42.0]), "this": None}, np.int8(3), np.int32(4), np.int64(5), np.uint8(3), np.uint32(4), np.uint64(5), np.float32(1.9), np.float64(1.9), np.zeros([100, 100]),
