This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b28dc5  ARROW-2142: [Python] Allow conversion from Numpy struct array
0b28dc5 is described below

commit 0b28dc50769893ff9d1608754ec499be4b3aa6bb
Author: Antoine Pitrou <anto...@python.org>
AuthorDate: Mon Mar 12 22:38:07 2018 -0400

    ARROW-2142: [Python] Allow conversion from Numpy struct array
    
    Author: Antoine Pitrou <anto...@python.org>
    
    Closes #1635 from pitrou/ARROW-2142-convert-from-np-struct-array and 
squashes the following commits:
    
    5ade2b26 <Antoine Pitrou> ARROW-2142:  Allow conversion from Numpy struct 
array
---
 cpp/src/arrow/array-test.cc                 |  64 +++++++++++++++
 cpp/src/arrow/array.cc                      |  81 +++++++++++++++++++
 cpp/src/arrow/array.h                       |  11 +++
 cpp/src/arrow/python/numpy_to_arrow.cc      | 113 +++++++++++++++++++++++++-
 cpp/src/arrow/test-util.h                   |   4 +-
 python/pyarrow/tests/test_convert_pandas.py | 120 +++++++++++++++++++++++++++-
 6 files changed, 388 insertions(+), 5 deletions(-)

diff --git a/cpp/src/arrow/array-test.cc b/cpp/src/arrow/array-test.cc
index bda1946..ad2335f 100644
--- a/cpp/src/arrow/array-test.cc
+++ b/cpp/src/arrow/array-test.cc
@@ -3256,4 +3256,68 @@ TEST_P(DecimalTest, WithNulls) {
 
 INSTANTIATE_TEST_CASE_P(DecimalTest, DecimalTest, ::testing::Range(1, 38));
 
+// ----------------------------------------------------------------------
+// Test rechunking
+
+TEST(TestRechunkArraysConsistently, Trivial) {
+  std::vector<ArrayVector> groups, rechunked;
+  rechunked = internal::RechunkArraysConsistently(groups);
+  ASSERT_EQ(rechunked.size(), 0);
+
+  std::shared_ptr<Array> a1, a2, b1;
+  ArrayFromVector<Int16Type, int16_t>({}, &a1);
+  ArrayFromVector<Int16Type, int16_t>({}, &a2);
+  ArrayFromVector<Int32Type, int32_t>({}, &b1);
+
+  groups = {{a1, a2}, {}, {b1}};
+  rechunked = internal::RechunkArraysConsistently(groups);
+  ASSERT_EQ(rechunked.size(), 3);
+}
+
+TEST(TestRechunkArraysConsistently, Plain) {
+  std::shared_ptr<Array> expected;
+  std::shared_ptr<Array> a1, a2, a3, b1, b2, b3, b4;
+  ArrayFromVector<Int16Type, int16_t>({1, 2, 3}, &a1);
+  ArrayFromVector<Int16Type, int16_t>({4, 5}, &a2);
+  ArrayFromVector<Int16Type, int16_t>({6, 7, 8, 9}, &a3);
+
+  ArrayFromVector<Int32Type, int32_t>({41, 42}, &b1);
+  ArrayFromVector<Int32Type, int32_t>({43, 44, 45}, &b2);
+  ArrayFromVector<Int32Type, int32_t>({46, 47}, &b3);
+  ArrayFromVector<Int32Type, int32_t>({48, 49}, &b4);
+
+  ArrayVector a{a1, a2, a3};
+  ArrayVector b{b1, b2, b3, b4};
+
+  std::vector<ArrayVector> groups{a, b}, rechunked;
+  rechunked = internal::RechunkArraysConsistently(groups);
+  ASSERT_EQ(rechunked.size(), 2);
+  auto ra = rechunked[0];
+  auto rb = rechunked[1];
+
+  ASSERT_EQ(ra.size(), 5);
+  ArrayFromVector<Int16Type, int16_t>({1, 2}, &expected);
+  ASSERT_ARRAYS_EQUAL(*ra[0], *expected);
+  ArrayFromVector<Int16Type, int16_t>({3}, &expected);
+  ASSERT_ARRAYS_EQUAL(*ra[1], *expected);
+  ArrayFromVector<Int16Type, int16_t>({4, 5}, &expected);
+  ASSERT_ARRAYS_EQUAL(*ra[2], *expected);
+  ArrayFromVector<Int16Type, int16_t>({6, 7}, &expected);
+  ASSERT_ARRAYS_EQUAL(*ra[3], *expected);
+  ArrayFromVector<Int16Type, int16_t>({8, 9}, &expected);
+  ASSERT_ARRAYS_EQUAL(*ra[4], *expected);
+
+  ASSERT_EQ(rb.size(), 5);
+  ArrayFromVector<Int32Type, int32_t>({41, 42}, &expected);
+  ASSERT_ARRAYS_EQUAL(*rb[0], *expected);
+  ArrayFromVector<Int32Type, int32_t>({43}, &expected);
+  ASSERT_ARRAYS_EQUAL(*rb[1], *expected);
+  ArrayFromVector<Int32Type, int32_t>({44, 45}, &expected);
+  ASSERT_ARRAYS_EQUAL(*rb[2], *expected);
+  ArrayFromVector<Int32Type, int32_t>({46, 47}, &expected);
+  ASSERT_ARRAYS_EQUAL(*rb[3], *expected);
+  ArrayFromVector<Int32Type, int32_t>({48, 49}, &expected);
+  ASSERT_ARRAYS_EQUAL(*rb[4], *expected);
+}
+
 }  // namespace arrow
diff --git a/cpp/src/arrow/array.cc b/cpp/src/arrow/array.cc
index 83142df..bd2b40c 100644
--- a/cpp/src/arrow/array.cc
+++ b/cpp/src/arrow/array.cc
@@ -20,6 +20,8 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
+#include <limits>
+#include <set>
 #include <sstream>
 #include <utility>
 
@@ -753,6 +755,85 @@ std::shared_ptr<Array> MakeArray(const 
std::shared_ptr<ArrayData>& data) {
 }
 
 // ----------------------------------------------------------------------
+// Misc APIs
+
+namespace internal {
+
+std::vector<ArrayVector> RechunkArraysConsistently(
+    const std::vector<ArrayVector>& groups) {
+  if (groups.size() <= 1) {
+    return groups;
+  }
+  int64_t total_length = 0;
+  for (const auto& array : groups.front()) {
+    total_length += array->length();
+  }
+#ifndef NDEBUG
+  for (const auto& group : groups) {
+    int64_t group_length = 0;
+    for (const auto& array : group) {
+      group_length += array->length();
+    }
+    DCHECK_EQ(group_length, total_length)
+        << "Array groups should have the same total number of elements";
+  }
+#endif
+  if (total_length == 0) {
+    return groups;
+  }
+
+  // Set up result vectors
+  std::vector<ArrayVector> rechunked_groups(groups.size());
+
+  // Set up progress counters
+  std::vector<ArrayVector::const_iterator> current_arrays;
+  std::vector<int64_t> array_offsets;
+  for (const auto& group : groups) {
+    current_arrays.emplace_back(group.cbegin());
+    array_offsets.emplace_back(0);
+  }
+
+  // Scan all array vectors at once, rechunking along the way
+  int64_t start = 0;
+  while (start < total_length) {
+    // First compute max possible length for next chunk
+    int64_t chunk_length = std::numeric_limits<int64_t>::max();
+    for (size_t i = 0; i < groups.size(); i++) {
+      auto& arr_it = current_arrays[i];
+      auto& offset = array_offsets[i];
+      // Skip any done arrays (including 0-length arrays)
+      while (offset == (*arr_it)->length()) {
+        ++arr_it;
+        offset = 0;
+      }
+      const auto& array = *arr_it;
+      DCHECK_GT(array->length(), offset);
+      chunk_length = std::min(chunk_length, array->length() - offset);
+    }
+    DCHECK_GT(chunk_length, 0);
+
+    // Then slice all arrays along this chunk size
+    for (size_t i = 0; i < groups.size(); i++) {
+      const auto& array = *current_arrays[i];
+      auto& offset = array_offsets[i];
+      if (offset == 0 && array->length() == chunk_length) {
+        // Slice spans entire array
+        rechunked_groups[i].emplace_back(array);
+      } else {
+        DCHECK_LT(chunk_length - offset, array->length());
+        rechunked_groups[i].emplace_back(array->Slice(offset, chunk_length));
+      }
+      offset += chunk_length;
+    }
+    start += chunk_length;
+  }
+
+  return rechunked_groups;
+}
+
+}  // namespace internal
+
+// ----------------------------------------------------------------------
 // Instantiate templates
 
 template class ARROW_TEMPLATE_EXPORT NumericArray<UInt8Type>;
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index faa9211..04bd012 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -278,6 +278,17 @@ class ARROW_EXPORT Array {
 
 using ArrayVector = std::vector<std::shared_ptr<Array>>;
 
+namespace internal {
+
+/// Given a number of ArrayVectors, treat each ArrayVector as the
+/// chunks of a chunked array.  Then rechunk each ArrayVector such that
+/// all ArrayVectors are chunked identically.  It is mandatory that
+/// all ArrayVectors contain the same total number of elements.
+ARROW_EXPORT
+std::vector<ArrayVector> RechunkArraysConsistently(const 
std::vector<ArrayVector>&);
+
+}  // namespace internal
+
 static inline std::ostream& operator<<(std::ostream& os, const Array& x) {
   os << x.ToString();
   return os;
diff --git a/cpp/src/arrow/python/numpy_to_arrow.cc 
b/cpp/src/arrow/python/numpy_to_arrow.cc
index c22afb7..4d91e53 100644
--- a/cpp/src/arrow/python/numpy_to_arrow.cc
+++ b/cpp/src/arrow/python/numpy_to_arrow.cc
@@ -380,7 +380,7 @@ class NumPyConverter {
 
   Status Convert();
 
-  const std::vector<std::shared_ptr<Array>>& result() const { return 
out_arrays_; }
+  const ArrayVector& result() const { return out_arrays_; }
 
   template <typename T>
   typename std::enable_if<std::is_base_of<PrimitiveCType, T>::value ||
@@ -406,6 +406,8 @@ class NumPyConverter {
   // NumPy unicode arrays
   Status Visit(const StringType& type);
 
+  Status Visit(const StructType& type);
+
   Status Visit(const FixedSizeBinaryType& type) {
     return TypeNotImplemented(type.ToString());
   }
@@ -531,7 +533,7 @@ class NumPyConverter {
   OwnedRefNoGIL decimal_type_;
 
   // Used in visitor pattern
-  std::vector<std::shared_ptr<Array>> out_arrays_;
+  ArrayVector out_arrays_;
 
   std::shared_ptr<ResizableBuffer> null_bitmap_;
   uint8_t* null_bitmap_data_;
@@ -1607,6 +1609,113 @@ Status NumPyConverter::Visit(const StringType& type) {
   return PushArray(result->data());
 }
 
+Status NumPyConverter::Visit(const StructType& type) {
+  std::vector<NumPyConverter> sub_converters;
+  std::vector<OwnedRefNoGIL> sub_arrays;
+
+  {
+    PyAcquireGIL gil_lock;
+
+    // Create converters for each struct type field
+    if (dtype_->fields == NULL || !PyDict_Check(dtype_->fields)) {
+      return Status::TypeError("Expected struct array");
+    }
+
+    for (auto field : type.children()) {
+      PyObject* tup = PyDict_GetItemString(dtype_->fields, 
field->name().c_str());
+      if (tup == NULL) {
+        std::stringstream ss;
+        ss << "Missing field '" << field->name() << "' in struct array";
+        return Status::TypeError(ss.str());
+      }
+      PyArray_Descr* sub_dtype =
+          reinterpret_cast<PyArray_Descr*>(PyTuple_GET_ITEM(tup, 0));
+      DCHECK(PyArray_DescrCheck(sub_dtype));
+      int offset = static_cast<int>(PyLong_AsLong(PyTuple_GET_ITEM(tup, 1)));
+      RETURN_IF_PYERROR();
+      Py_INCREF(sub_dtype); /* PyArray_GetField() steals ref */
+      PyObject* sub_array = PyArray_GetField(arr_, sub_dtype, offset);
+      RETURN_IF_PYERROR();
+      sub_arrays.emplace_back(sub_array);
+      sub_converters.emplace_back(pool_, sub_array, nullptr /* mask */, 
field->type(),
+                                  use_pandas_null_sentinels_);
+    }
+  }
+
+  std::vector<ArrayVector> groups;
+  int64_t null_count = 0;
+
+  // Compute null bitmap and store it as a Boolean Array to include it
+  // in the rechunking below
+  {
+    if (mask_ != nullptr) {
+      RETURN_NOT_OK(InitNullBitmap());
+      null_count = MaskToBitmap(mask_, length_, null_bitmap_data_);
+    }
+    groups.push_back({std::make_shared<BooleanArray>(length_, null_bitmap_)});
+  }
+
+  // Convert child data
+  for (auto& converter : sub_converters) {
+    RETURN_NOT_OK(converter.Convert());
+    groups.push_back(converter.result());
+    const auto& group = groups.back();
+    int64_t n = 0;
+    for (const auto& array : group) {
+      n += array->length();
+    }
+  }
+  // Ensure the different array groups are chunked consistently
+  groups = ::arrow::internal::RechunkArraysConsistently(groups);
+  for (const auto& group : groups) {
+    int64_t n = 0;
+    for (const auto& array : group) {
+      n += array->length();
+    }
+  }
+
+  // Make struct array chunks by combining groups
+  size_t ngroups = groups.size();
+  size_t nchunks = groups[0].size();
+  for (size_t chunk = 0; chunk < nchunks; chunk++) {
+    // First group has the null bitmaps as Boolean Arrays
+    const auto& null_data = groups[0][chunk]->data();
+    DCHECK_EQ(null_data->type->id(), Type::BOOL);
+    DCHECK_EQ(null_data->buffers.size(), 2);
+    const auto& null_buffer = null_data->buffers[1];
+    // Careful: the rechunked null bitmap may have a non-zero offset
+    // to its buffer, and it may not even start on a byte boundary
+    int64_t null_offset = null_data->offset;
+    std::shared_ptr<Buffer> fixed_null_buffer;
+
+    if (!null_buffer) {
+      fixed_null_buffer = null_buffer;
+    } else if (null_offset % 8 == 0) {
+      fixed_null_buffer =
+          std::make_shared<Buffer>(null_buffer,
+                                   // byte offset
+                                   null_offset / 8,
+                                   // byte size
+                                   BitUtil::BytesForBits(null_data->length));
+    } else {
+      RETURN_NOT_OK(CopyBitmap(pool_, null_buffer->data(), null_offset, 
null_data->length,
+                               &fixed_null_buffer));
+    }
+
+    // Create struct array chunk and populate it
+    auto arr_data =
+        ArrayData::Make(type_, null_data->length, null_count ? 
kUnknownNullCount : 0, 0);
+    arr_data->buffers.push_back(fixed_null_buffer);
+    // Append child chunks
+    for (size_t i = 1; i < ngroups; i++) {
+      arr_data->child_data.push_back(groups[i][chunk]->data());
+    }
+    RETURN_NOT_OK(PushArray(arr_data));
+  }
+
+  return Status::OK();
+}
+
 Status NdarrayToArrow(MemoryPool* pool, PyObject* ao, PyObject* mo,
                       bool use_pandas_null_sentinels,
                       const std::shared_ptr<DataType>& type,
diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h
index ab68fd4..fdd42a6 100644
--- a/cpp/src/arrow/test-util.h
+++ b/cpp/src/arrow/test-util.h
@@ -401,8 +401,8 @@ void ArrayFromVector(const std::vector<bool>& is_valid, 
const std::vector<C_TYPE
 template <typename TYPE, typename C_TYPE>
 void ArrayFromVector(const std::vector<C_TYPE>& values, 
std::shared_ptr<Array>* out) {
   typename TypeTraits<TYPE>::BuilderType builder;
-  for (size_t i = 0; i < values.size(); ++i) {
-    ASSERT_OK(builder.Append(values[i]));
+  for (auto& value : values) {
+    ASSERT_OK(builder.Append(value));
   }
   ASSERT_OK(builder.Finish(out));
 }
diff --git a/python/pyarrow/tests/test_convert_pandas.py 
b/python/pyarrow/tests/test_convert_pandas.py
index 48db204..dff4e10 100644
--- a/python/pyarrow/tests/test_convert_pandas.py
+++ b/python/pyarrow/tests/test_convert_pandas.py
@@ -1466,7 +1466,7 @@ class TestConvertStructTypes(object):
     Conversion tests for struct types.
     """
 
-    def test_structarray(self):
+    def test_to_pandas(self):
         ints = pa.array([None, 2, 3], type=pa.int64())
         strs = pa.array([u'a', None, u'c'], type=pa.string())
         bools = pa.array([True, False, None], type=pa.bool_())
@@ -1483,6 +1483,124 @@ class TestConvertStructTypes(object):
         series = pd.Series(arr.to_pandas())
         tm.assert_series_equal(series, expected)
 
+    def test_from_numpy(self):
+        dt = np.dtype([('x', np.int32),
+                       (('y_title', 'y'), np.bool_)])
+        ty = pa.struct([pa.field('x', pa.int32()),
+                        pa.field('y', pa.bool_())])
+
+        data = np.array([], dtype=dt)
+        arr = pa.array(data, type=ty)
+        assert arr.to_pylist() == []
+
+        data = np.array([(42, True), (43, False)], dtype=dt)
+        arr = pa.array(data, type=ty)
+        assert arr.to_pylist() == [{'x': 42, 'y': True},
+                                   {'x': 43, 'y': False}]
+
+        # With mask
+        arr = pa.array(data, mask=np.bool_([False, True]), type=ty)
+        assert arr.to_pylist() == [{'x': 42, 'y': True}, None]
+
+        # Trivial struct type
+        dt = np.dtype([])
+        ty = pa.struct([])
+
+        data = np.array([], dtype=dt)
+        arr = pa.array(data, type=ty)
+        assert arr.to_pylist() == []
+
+        data = np.array([(), ()], dtype=dt)
+        arr = pa.array(data, type=ty)
+        assert arr.to_pylist() == [{}, {}]
+
+    def test_from_numpy_nested(self):
+        dt = np.dtype([('x', np.dtype([('xx', np.int8),
+                                       ('yy', np.bool_)])),
+                       ('y', np.int16)])
+        ty = pa.struct([pa.field('x', pa.struct([pa.field('xx', pa.int8()),
+                                                 pa.field('yy', pa.bool_())])),
+                        pa.field('y', pa.int16())])
+
+        data = np.array([], dtype=dt)
+        arr = pa.array(data, type=ty)
+        assert arr.to_pylist() == []
+
+        data = np.array([((1, True), 2), ((3, False), 4)], dtype=dt)
+        arr = pa.array(data, type=ty)
+        assert arr.to_pylist() == [{'x': {'xx': 1, 'yy': True}, 'y': 2},
+                                   {'x': {'xx': 3, 'yy': False}, 'y': 4}]
+
+    @pytest.mark.large_memory
+    def test_from_numpy_large(self):
+        # Exercise rechunking + nulls
+        target_size = 3 * 1024**3  # 4GB
+        dt = np.dtype([('x', np.float64), ('y', 'object')])
+        bs = 65536 - dt.itemsize
+        block = b'.' * bs
+        n = target_size // (bs + dt.itemsize)
+        data = np.zeros(n, dtype=dt)
+        data['x'] = np.random.random_sample(n)
+        data['y'] = block
+        # Add implicit nulls
+        data['x'][data['x'] < 0.2] = np.nan
+
+        ty = pa.struct([pa.field('x', pa.float64()),
+                        pa.field('y', pa.binary(bs))])
+        arr = pa.array(data, type=ty, from_pandas=True)
+        assert arr.num_chunks == 2
+
+        def iter_chunked_array(arr):
+            for chunk in arr.iterchunks():
+                for item in chunk:
+                    yield item
+
+        def check(arr, data, mask=None):
+            assert len(arr) == len(data)
+            xs = data['x']
+            ys = data['y']
+            for i, obj in enumerate(iter_chunked_array(arr)):
+                try:
+                    d = obj.as_py()
+                    if mask is not None and mask[i]:
+                        assert d is None
+                    else:
+                        x = xs[i]
+                        if np.isnan(x):
+                            assert d['x'] is None
+                        else:
+                            assert d['x'] == x
+                        assert d['y'] == ys[i]
+                except Exception:
+                    print("Failed at index", i)
+                    raise
+
+        check(arr, data)
+        del arr
+
+        # Now with explicit mask
+        mask = np.random.random_sample(n) < 0.2
+        arr = pa.array(data, type=ty, mask=mask, from_pandas=True)
+        assert arr.num_chunks == 2
+
+        check(arr, data, mask)
+        del arr
+
+    def test_from_numpy_bad_input(self):
+        ty = pa.struct([pa.field('x', pa.int32()),
+                        pa.field('y', pa.bool_())])
+        dt = np.dtype([('x', np.int32),
+                       ('z', np.bool_)])
+
+        data = np.array([], dtype=dt)
+        with pytest.raises(TypeError,
+                           match="Missing field 'y'"):
+            pa.array(data, type=ty)
+        data = np.int32([])
+        with pytest.raises(TypeError,
+                           match="Expected struct array"):
+            pa.array(data, type=ty)
+
 
 class TestZeroCopyConversion(object):
     """

-- 
To stop receiving notification emails like this one, please contact
w...@apache.org.

Reply via email to