This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 26c6805  ARROW-1886: [C++/Python] Flatten struct columns in table
26c6805 is described below

commit 26c6805981415c441e1f757213b7d410295f76f7
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu May 3 15:12:02 2018 +0200

    ARROW-1886: [C++/Python] Flatten struct columns in table
    
    Add C++ and Python APIs to flatten struct fields and struct columns.
    
    Based on PR #1755.
    
    Author: Antoine Pitrou <[email protected]>
    
    Closes #1768 from pitrou/ARROW-1886-flatten-table and squashes the 
following commits:
    
    a821b77 <Antoine Pitrou> Add test for empty column
    b8335af <Antoine Pitrou> ARROW-1886:  Flatten struct columns in table
---
 cpp/src/arrow/table.cc               | 77 +++++++++++++++++++++++++++++++++++-
 cpp/src/arrow/table.h                | 20 ++++++++++
 cpp/src/arrow/type-test.cc           | 27 +++++++++++++
 cpp/src/arrow/type.cc                | 15 +++++++
 cpp/src/arrow/type.h                 |  2 +
 cpp/src/arrow/util/bit-util-test.cc  |  3 +-
 python/pyarrow/includes/libarrow.pxd |  7 ++++
 python/pyarrow/table.pxi             | 65 +++++++++++++++++++++++++++++-
 python/pyarrow/tests/test_schema.py  | 20 ++++++++++
 python/pyarrow/tests/test_table.py   | 42 ++++++++++++++++++++
 python/pyarrow/types.pxi             | 14 +++++++
 11 files changed, 288 insertions(+), 4 deletions(-)

diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 8af47ea..313b518 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -139,6 +139,38 @@ std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t 
offset) const {
   return Slice(offset, length_);
 }
 
+Status ChunkedArray::Flatten(MemoryPool* pool,
+                             std::vector<std::shared_ptr<ChunkedArray>>* out) 
const {
+  std::vector<std::shared_ptr<ChunkedArray>> flattened;
+  if (type()->id() != Type::STRUCT) {
+    // Emulate non-existent copy constructor
+    flattened.emplace_back(std::make_shared<ChunkedArray>(chunks_, type_));
+    *out = flattened;
+    return Status::OK();
+  }
+  std::vector<ArrayVector> flattened_chunks;
+  for (const auto& chunk : chunks_) {
+    ArrayVector res;
+    RETURN_NOT_OK(dynamic_cast<const StructArray&>(*chunk).Flatten(pool, 
&res));
+    if (!flattened_chunks.size()) {
+      // First chunk
+      for (const auto& array : res) {
+        flattened_chunks.push_back({array});
+      }
+    } else {
+      DCHECK_EQ(flattened_chunks.size(), res.size());
+      for (size_t i = 0; i < res.size(); ++i) {
+        flattened_chunks[i].push_back(res[i]);
+      }
+    }
+  }
+  for (const auto& vec : flattened_chunks) {
+    flattened.emplace_back(std::make_shared<ChunkedArray>(vec));
+  }
+  *out = flattened;
+  return Status::OK();
+}
+
 Column::Column(const std::shared_ptr<Field>& field, const ArrayVector& chunks)
     : field_(field) {
   data_ = std::make_shared<ChunkedArray>(chunks, field->type());
@@ -160,6 +192,20 @@ Column::Column(const std::shared_ptr<Field>& field,
                const std::shared_ptr<ChunkedArray>& data)
     : field_(field), data_(data) {}
 
+Status Column::Flatten(MemoryPool* pool,
+                       std::vector<std::shared_ptr<Column>>* out) const {
+  std::vector<std::shared_ptr<Column>> flattened;
+  std::vector<std::shared_ptr<Field>> flattened_fields = field_->Flatten();
+  std::vector<std::shared_ptr<ChunkedArray>> flattened_data;
+  RETURN_NOT_OK(data_->Flatten(pool, &flattened_data));
+  DCHECK_EQ(flattened_fields.size(), flattened_data.size());
+  for (size_t i = 0; i < flattened_fields.size(); ++i) {
+    flattened.push_back(std::make_shared<Column>(flattened_fields[i], 
flattened_data[i]));
+  }
+  *out = flattened;
+  return Status::OK();
+}
+
 bool Column::Equals(const Column& other) const {
   if (!field_->Equals(other.field())) {
     return false;
@@ -268,12 +314,28 @@ class SimpleTable : public Table {
     return Table::Make(new_schema, columns_);
   }
 
+  Status Flatten(MemoryPool* pool, std::shared_ptr<Table>* out) const override 
{
+    std::vector<std::shared_ptr<Field>> flattened_fields;
+    std::vector<std::shared_ptr<Column>> flattened_columns;
+    for (const auto& column : columns_) {
+      std::vector<std::shared_ptr<Column>> new_columns;
+      RETURN_NOT_OK(column->Flatten(pool, &new_columns));
+      for (const auto& new_col : new_columns) {
+        flattened_fields.push_back(new_col->field());
+        flattened_columns.push_back(new_col);
+      }
+    }
+    auto flattened_schema =
+        std::make_shared<Schema>(flattened_fields, schema_->metadata());
+    *out = Table::Make(flattened_schema, flattened_columns);
+    return Status::OK();
+  }
+
   Status Validate() const override {
+    // Make sure columns and schema are consistent
     if (static_cast<int>(columns_.size()) != schema_->num_fields()) {
       return Status::Invalid("Number of columns did not match schema");
     }
-
-    // Make sure columns are all the same length
     for (int i = 0; i < num_columns(); ++i) {
       const Column* col = columns_[i].get();
       if (col == nullptr) {
@@ -281,6 +343,17 @@ class SimpleTable : public Table {
         ss << "Column " << i << " was null";
         return Status::Invalid(ss.str());
       }
+      if (!col->field()->Equals(*schema_->field(i))) {
+        std::stringstream ss;
+        ss << "Column field " << i << " named " << col->name()
+           << " is inconsistent with schema";
+        return Status::Invalid(ss.str());
+      }
+    }
+
+    // Make sure columns are all the same length
+    for (int i = 0; i < num_columns(); ++i) {
+      const Column* col = columns_[i].get();
       if (col->length() != num_rows_) {
         std::stringstream ss;
         ss << "Column " << i << " named " << col->name() << " expected length "
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 32af224..7fa207f 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -69,6 +69,13 @@ class ARROW_EXPORT ChunkedArray {
   /// \brief Slice from offset until end of the chunked array
   std::shared_ptr<ChunkedArray> Slice(int64_t offset) const;
 
+  /// \brief Flatten this chunked array as a vector of chunked arrays, one
+  /// for each struct field
+  ///
+  /// \param[in] pool The pool for buffer allocations, if any
+  /// \param[out] out The resulting vector of arrays
+  Status Flatten(MemoryPool* pool, std::vector<std::shared_ptr<ChunkedArray>>* 
out) const;
+
   std::shared_ptr<DataType> type() const { return type_; }
 
   bool Equals(const ChunkedArray& other) const;
@@ -133,6 +140,12 @@ class ARROW_EXPORT Column {
     return std::make_shared<Column>(field_, data_->Slice(offset));
   }
 
+  /// \brief Flatten this column as a vector of columns
+  ///
+  /// \param[in] pool The pool for buffer allocations, if any
+  /// \param[out] out The resulting vector of arrays
+  Status Flatten(MemoryPool* pool, std::vector<std::shared_ptr<Column>>* out) 
const;
+
   bool Equals(const Column& other) const;
   bool Equals(const std::shared_ptr<Column>& other) const;
 
@@ -215,6 +228,13 @@ class ARROW_EXPORT Table {
   virtual std::shared_ptr<Table> ReplaceSchemaMetadata(
       const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0;
 
+  /// \brief Flatten the table, producing a new Table.  Any column with a
+  /// struct type will be flattened into multiple columns
+  ///
+  /// \param[in] pool The pool for buffer allocations, if any
+  /// \param[out] out The returned table
+  virtual Status Flatten(MemoryPool* pool, std::shared_ptr<Table>* out) const 
= 0;
+
   /// \brief Perform any checks to validate the input arguments
   virtual Status Validate() const = 0;
 
diff --git a/cpp/src/arrow/type-test.cc b/cpp/src/arrow/type-test.cc
index f62d14d..e9245d5 100644
--- a/cpp/src/arrow/type-test.cc
+++ b/cpp/src/arrow/type-test.cc
@@ -86,6 +86,33 @@ TEST(TestField, TestRemoveMetadata) {
   ASSERT_TRUE(f2->metadata() == nullptr);
 }
 
+TEST(TestField, TestFlatten) {
+  auto metadata = std::shared_ptr<KeyValueMetadata>(
+      new KeyValueMetadata({"foo", "bar"}, {"bizz", "buzz"}));
+  auto f0 = field("f0", int32(), true /* nullable */, metadata);
+  auto vec = f0->Flatten();
+  ASSERT_EQ(vec.size(), 1);
+  ASSERT_TRUE(vec[0]->Equals(*f0));
+
+  auto f1 = field("f1", float64(), false /* nullable */);
+  auto ff = field("nest", struct_({f0, f1}));
+  vec = ff->Flatten();
+  ASSERT_EQ(vec.size(), 2);
+  auto expected0 = field("nest.f0", int32(), true /* nullable */, metadata);
+  // nullable parent implies nullable flattened child
+  auto expected1 = field("nest.f1", float64(), true /* nullable */);
+  ASSERT_TRUE(vec[0]->Equals(*expected0));
+  ASSERT_TRUE(vec[1]->Equals(*expected1));
+
+  ff = field("nest", struct_({f0, f1}), false /* nullable */);
+  vec = ff->Flatten();
+  ASSERT_EQ(vec.size(), 2);
+  expected0 = field("nest.f0", int32(), true /* nullable */, metadata);
+  expected1 = field("nest.f1", float64(), false /* nullable */);
+  ASSERT_TRUE(vec[0]->Equals(*expected0));
+  ASSERT_TRUE(vec[1]->Equals(*expected1));
+}
+
 class TestSchema : public ::testing::Test {
  public:
   void SetUp() {}
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 16e7585..2f6e718 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -42,6 +42,21 @@ std::shared_ptr<Field> Field::RemoveMetadata() const {
   return std::make_shared<Field>(name_, type_, nullable_);
 }
 
+std::vector<std::shared_ptr<Field>> Field::Flatten() const {
+  std::vector<std::shared_ptr<Field>> flattened;
+  if (type_->id() == Type::STRUCT) {
+    for (const auto& child : type_->children()) {
+      auto flattened_child = std::make_shared<Field>(*child);
+      flattened.push_back(flattened_child);
+      flattened_child->name_.insert(0, name() + ".");
+      flattened_child->nullable_ |= nullable_;
+    }
+  } else {
+    flattened.push_back(std::make_shared<Field>(*this));
+  }
+  return flattened;
+}
+
 bool Field::Equals(const Field& other) const {
   if (this == &other) {
     return true;
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 9cd1d8f..915c0c7 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -236,6 +236,8 @@ class ARROW_EXPORT Field {
       const std::shared_ptr<const KeyValueMetadata>& metadata) const;
   std::shared_ptr<Field> RemoveMetadata() const;
 
+  std::vector<std::shared_ptr<Field>> Flatten() const;
+
   bool Equals(const Field& other) const;
   bool Equals(const std::shared_ptr<Field>& other) const;
 
diff --git a/cpp/src/arrow/util/bit-util-test.cc 
b/cpp/src/arrow/util/bit-util-test.cc
index ba4d8a3..c527244 100644
--- a/cpp/src/arrow/util/bit-util-test.cc
+++ b/cpp/src/arrow/util/bit-util-test.cc
@@ -22,6 +22,7 @@
 #include <initializer_list>
 #include <limits>
 #include <memory>
+#include <valarray>
 #include <vector>
 
 #include <gtest/gtest.h>
@@ -92,7 +93,7 @@ void ASSERT_READER_VALUES(internal::BitmapReader& reader, 
std::vector<int> value
 // Assert equal contents of a memory area and a vector of bytes
 void ASSERT_BYTES_EQ(const uint8_t* left, const std::vector<uint8_t>& right) {
   auto left_array = std::vector<uint8_t>(left, left + right.size());
-  ASSERT_EQ(std::vector<uint8_t>(std::begin(left_array), 
std::end(left_array)), right);
+  ASSERT_EQ(left_array, right);
 }
 
 TEST(BitUtilTests, TestIsMultipleOf64) {
diff --git a/python/pyarrow/includes/libarrow.pxd 
b/python/pyarrow/includes/libarrow.pxd
index 1dcff8a..12bbefb 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -258,6 +258,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         shared_ptr[CField] AddMetadata(
             const shared_ptr[CKeyValueMetadata]& metadata)
         shared_ptr[CField] RemoveMetadata()
+        vector[shared_ptr[CField]] Flatten()
 
     cdef cppclass CStructType" arrow::StructType"(CDataType):
         CStructType(const vector[shared_ptr[CField]]& fields)
@@ -437,6 +438,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
 
         c_bool Equals(const CColumn& other)
 
+        CStatus Flatten(CMemoryPool* pool, vector[shared_ptr[CColumn]]* out)
+
         shared_ptr[CField] field()
 
         int64_t length()
@@ -495,6 +498,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
                           shared_ptr[CTable]* out)
         CStatus RemoveColumn(int i, shared_ptr[CTable]* out)
 
+        CStatus Flatten(CMemoryPool* pool, shared_ptr[CTable]* out)
+
+        CStatus Validate()
+
         shared_ptr[CTable] ReplaceSchemaMetadata(
             const shared_ptr[CKeyValueMetadata]& metadata)
 
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index a97fde2..c867657 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -271,13 +271,22 @@ cdef class Column:
     def __repr__(self):
         from pyarrow.compat import StringIO
         result = StringIO()
-        result.write(object.__repr__(self))
+        result.write('<Column name={0!r} type={1!r}>'
+                     .format(self.name, self.type))
         data = self.data
         for i, chunk in enumerate(data.chunks):
             result.write('\nchunk {0}: {1}'.format(i, repr(chunk)))
 
         return result.getvalue()
 
+    def __richcmp__(Column self, Column other, int op):
+        if op == cp.Py_EQ:
+            return self.equals(other)
+        elif op == cp.Py_NE:
+            return not self.equals(other)
+        else:
+            raise TypeError('Invalid comparison')
+
     @staticmethod
     def from_array(*args):
         return column(*args)
@@ -315,6 +324,29 @@ cdef class Column:
         casted_data = pyarrow_wrap_chunked_array(out.chunked_array())
         return column(self.name, casted_data)
 
+    def flatten(self, MemoryPool memory_pool=None):
+        """
+        Flatten this Column.  If it has a struct type, the column is
+        flattened into one column per struct field.
+
+        Parameters
+        ----------
+        memory_pool : MemoryPool, default None
+            For memory allocations, if required, otherwise use default pool
+
+        Returns
+        -------
+        result : List[Column]
+        """
+        cdef:
+            vector[shared_ptr[CColumn]] flattened
+            CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+
+        with nogil:
+            check_status(self.column.Flatten(pool, &flattened))
+
+        return [pyarrow_wrap_column(col) for col in flattened]
+
     def to_pandas(self,
                   c_bool strings_to_categorical=False,
                   c_bool zero_copy_only=False,
@@ -843,6 +875,14 @@ cdef class Table:
             )
         return 0
 
+    def _validate(self):
+        """
+        Validate table consistency.
+        """
+        self._check_nullptr()
+        with nogil:
+            check_status(self.table.Validate())
+
     def replace_schema_metadata(self, dict metadata=None):
         """
         EXPERIMENTAL: Create shallow copy of table by replacing schema
@@ -867,6 +907,29 @@ cdef class Table:
 
         return pyarrow_wrap_table(new_table)
 
+    def flatten(self, MemoryPool memory_pool=None):
+        """
+        Flatten this Table.  Each column with a struct type is flattened
+        into one column per struct field.  Other columns are left unchanged.
+
+        Parameters
+        ----------
+        memory_pool : MemoryPool, default None
+            For memory allocations, if required, otherwise use default pool
+
+        Returns
+        -------
+        result : Table
+        """
+        cdef:
+            shared_ptr[CTable] flattened
+            CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+
+        with nogil:
+            check_status(self.table.Flatten(pool, &flattened))
+
+        return pyarrow_wrap_table(flattened)
+
     def equals(self, Table other):
         """
         Check if contents of two tables are equal
diff --git a/python/pyarrow/tests/test_schema.py 
b/python/pyarrow/tests/test_schema.py
index 6f99c12..9570506 100644
--- a/python/pyarrow/tests/test_schema.py
+++ b/python/pyarrow/tests/test_schema.py
@@ -254,6 +254,26 @@ def test_field_add_remove_metadata():
     assert f5.equals(f6)
 
 
+def test_field_flatten():
+    f0 = pa.field('foo', pa.int32()).add_metadata({b'foo': b'bar'})
+    assert f0.flatten() == [f0]
+
+    f1 = pa.field('bar', pa.float64(), nullable=False)
+    ff = pa.field('ff', pa.struct([f0, f1]), nullable=False)
+    assert ff.flatten() == [
+        pa.field('ff.foo', pa.int32()).add_metadata({b'foo': b'bar'}),
+        pa.field('ff.bar', pa.float64(), nullable=False)]  # XXX
+
+    # Nullable parent makes flattened child nullable
+    ff = pa.field('ff', pa.struct([f0, f1]))
+    assert ff.flatten() == [
+        pa.field('ff.foo', pa.int32()).add_metadata({b'foo': b'bar'}),
+        pa.field('ff.bar', pa.float64())]
+
+    fff = pa.field('fff', pa.struct([ff]))
+    assert fff.flatten() == [pa.field('fff.ff', pa.struct([f0, f1]))]
+
+
 def test_schema_add_remove_metadata():
     fields = [
         pa.field('foo', pa.int32()),
diff --git a/python/pyarrow/tests/test_table.py 
b/python/pyarrow/tests/test_table.py
index 100f2b0..9a0a482 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -97,6 +97,23 @@ def test_column_to_pandas():
     assert series.iloc[0] == -10
 
 
+def test_column_flatten():
+    ty = pa.struct([pa.field('x', pa.int16()),
+                    pa.field('y', pa.float32())])
+    a = pa.array([(1, 2.5), (3, 4.5), (5, 6.5)], type=ty)
+    col = pa.Column.from_array('foo', a)
+    x, y = col.flatten()
+    assert x == pa.column('foo.x', pa.array([1, 3, 5], type=pa.int16()))
+    assert y == pa.column('foo.y', pa.array([2.5, 4.5, 6.5],
+                                            type=pa.float32()))
+    # Empty column
+    a = pa.array([], type=ty)
+    col = pa.Column.from_array('foo', a)
+    x, y = col.flatten()
+    assert x == pa.column('foo.x', pa.array([], type=pa.int16()))
+    assert y == pa.column('foo.y', pa.array([], type=pa.float32()))
+
+
 def test_recordbatch_basics():
     data = [
         pa.array(range(5)),
@@ -269,6 +286,7 @@ def test_table_basics():
         pa.array([-10, -5, 0, 5, 10])
     ]
     table = pa.Table.from_arrays(data, names=('a', 'b'))
+    table._validate()
     assert len(table) == 5
     assert table.num_rows == 5
     assert table.num_columns == 2
@@ -367,6 +385,7 @@ def test_table_remove_column():
     table = pa.Table.from_arrays(data, names=('a', 'b', 'c'))
 
     t2 = table.remove_column(0)
+    t2._validate()
     expected = pa.Table.from_arrays(data[1:], names=('b', 'c'))
     assert t2.equals(expected)
 
@@ -379,12 +398,34 @@ def test_table_remove_column_empty():
     table = pa.Table.from_arrays(data, names=['a'])
 
     t2 = table.remove_column(0)
+    t2._validate()
     assert len(t2) == len(table)
 
     t3 = t2.add_column(0, table[0])
+    t3._validate()
     assert t3.equals(table)
 
 
+def test_table_flatten():
+    ty1 = pa.struct([pa.field('x', pa.int16()),
+                     pa.field('y', pa.float32())])
+    ty2 = pa.struct([pa.field('nest', ty1)])
+    a = pa.array([(1, 2.5), (3, 4.5)], type=ty1)
+    b = pa.array([((11, 12.5),), ((13, 14.5),)], type=ty2)
+    c = pa.array([False, True], type=pa.bool_())
+
+    table = pa.Table.from_arrays([a, b, c], names=['a', 'b', 'c'])
+    t2 = table.flatten()
+    t2._validate()
+    expected = pa.Table.from_arrays([
+        pa.array([1, 3], type=pa.int16()),
+        pa.array([2.5, 4.5], type=pa.float32()),
+        pa.array([(11, 12.5), (13, 14.5)], type=ty1),
+        c],
+        names=['a.x', 'a.y', 'b.nest', 'c'])
+    assert t2.equals(expected)
+
+
 def test_concat_tables():
     data = [
         list(range(5)),
@@ -401,6 +442,7 @@ def test_concat_tables():
                               names=('a', 'b'))
 
     result = pa.concat_tables([t1, t2])
+    result._validate()
     assert len(result) == 10
 
     expected = pa.Table.from_arrays([pa.array(x + y)
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index 9cd9bed..2922dc2 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -487,6 +487,20 @@ cdef class Field:
             new_field = self.field.RemoveMetadata()
         return pyarrow_wrap_field(new_field)
 
+    def flatten(self):
+        """
+        Flatten this field.  If a struct field, individual child fields
+        will be returned with their names prefixed by the parent's name.
+
+        Returns
+        -------
+        fields : List[pyarrow.Field]
+        """
+        cdef vector[shared_ptr[CField]] flattened
+        with nogil:
+            flattened = self.field.Flatten()
+        return [pyarrow_wrap_field(f) for f in flattened]
+
 
 cdef class Schema:
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to