This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 26c6805 ARROW-1886: [C++/Python] Flatten struct columns in table
26c6805 is described below
commit 26c6805981415c441e1f757213b7d410295f76f7
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu May 3 15:12:02 2018 +0200
ARROW-1886: [C++/Python] Flatten struct columns in table
Add C++ and Python APIs to flatten struct fields and struct columns.
Based on PR #1755.
Author: Antoine Pitrou <[email protected]>
Closes #1768 from pitrou/ARROW-1886-flatten-table and squashes the
following commits:
a821b77 <Antoine Pitrou> Add test for empty column
b8335af <Antoine Pitrou> ARROW-1886: Flatten struct columns in table
---
cpp/src/arrow/table.cc | 77 +++++++++++++++++++++++++++++++++++-
cpp/src/arrow/table.h | 20 ++++++++++
cpp/src/arrow/type-test.cc | 27 +++++++++++++
cpp/src/arrow/type.cc | 15 +++++++
cpp/src/arrow/type.h | 2 +
cpp/src/arrow/util/bit-util-test.cc | 3 +-
python/pyarrow/includes/libarrow.pxd | 7 ++++
python/pyarrow/table.pxi | 65 +++++++++++++++++++++++++++++-
python/pyarrow/tests/test_schema.py | 20 ++++++++++
python/pyarrow/tests/test_table.py | 42 ++++++++++++++++++++
python/pyarrow/types.pxi | 14 +++++++
11 files changed, 288 insertions(+), 4 deletions(-)
diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc
index 8af47ea..313b518 100644
--- a/cpp/src/arrow/table.cc
+++ b/cpp/src/arrow/table.cc
@@ -139,6 +139,38 @@ std::shared_ptr<ChunkedArray> ChunkedArray::Slice(int64_t
offset) const {
return Slice(offset, length_);
}
+Status ChunkedArray::Flatten(MemoryPool* pool,
+ std::vector<std::shared_ptr<ChunkedArray>>* out)
const {
+ std::vector<std::shared_ptr<ChunkedArray>> flattened;
+ if (type()->id() != Type::STRUCT) {
+ // Emulate non-existent copy constructor
+ flattened.emplace_back(std::make_shared<ChunkedArray>(chunks_, type_));
+ *out = flattened;
+ return Status::OK();
+ }
+ std::vector<ArrayVector> flattened_chunks;
+ for (const auto& chunk : chunks_) {
+ ArrayVector res;
+ RETURN_NOT_OK(dynamic_cast<const StructArray&>(*chunk).Flatten(pool,
&res));
+ if (!flattened_chunks.size()) {
+ // First chunk
+ for (const auto& array : res) {
+ flattened_chunks.push_back({array});
+ }
+ } else {
+ DCHECK_EQ(flattened_chunks.size(), res.size());
+ for (size_t i = 0; i < res.size(); ++i) {
+ flattened_chunks[i].push_back(res[i]);
+ }
+ }
+ }
+ for (const auto& vec : flattened_chunks) {
+ flattened.emplace_back(std::make_shared<ChunkedArray>(vec));
+ }
+ *out = flattened;
+ return Status::OK();
+}
+
Column::Column(const std::shared_ptr<Field>& field, const ArrayVector& chunks)
: field_(field) {
data_ = std::make_shared<ChunkedArray>(chunks, field->type());
@@ -160,6 +192,20 @@ Column::Column(const std::shared_ptr<Field>& field,
const std::shared_ptr<ChunkedArray>& data)
: field_(field), data_(data) {}
+Status Column::Flatten(MemoryPool* pool,
+ std::vector<std::shared_ptr<Column>>* out) const {
+ std::vector<std::shared_ptr<Column>> flattened;
+ std::vector<std::shared_ptr<Field>> flattened_fields = field_->Flatten();
+ std::vector<std::shared_ptr<ChunkedArray>> flattened_data;
+ RETURN_NOT_OK(data_->Flatten(pool, &flattened_data));
+ DCHECK_EQ(flattened_fields.size(), flattened_data.size());
+ for (size_t i = 0; i < flattened_fields.size(); ++i) {
+ flattened.push_back(std::make_shared<Column>(flattened_fields[i],
flattened_data[i]));
+ }
+ *out = flattened;
+ return Status::OK();
+}
+
bool Column::Equals(const Column& other) const {
if (!field_->Equals(other.field())) {
return false;
@@ -268,12 +314,28 @@ class SimpleTable : public Table {
return Table::Make(new_schema, columns_);
}
+ Status Flatten(MemoryPool* pool, std::shared_ptr<Table>* out) const override
{
+ std::vector<std::shared_ptr<Field>> flattened_fields;
+ std::vector<std::shared_ptr<Column>> flattened_columns;
+ for (const auto& column : columns_) {
+ std::vector<std::shared_ptr<Column>> new_columns;
+ RETURN_NOT_OK(column->Flatten(pool, &new_columns));
+ for (const auto& new_col : new_columns) {
+ flattened_fields.push_back(new_col->field());
+ flattened_columns.push_back(new_col);
+ }
+ }
+ auto flattened_schema =
+ std::make_shared<Schema>(flattened_fields, schema_->metadata());
+ *out = Table::Make(flattened_schema, flattened_columns);
+ return Status::OK();
+ }
+
Status Validate() const override {
+ // Make sure columns and schema are consistent
if (static_cast<int>(columns_.size()) != schema_->num_fields()) {
return Status::Invalid("Number of columns did not match schema");
}
-
- // Make sure columns are all the same length
for (int i = 0; i < num_columns(); ++i) {
const Column* col = columns_[i].get();
if (col == nullptr) {
@@ -281,6 +343,17 @@ class SimpleTable : public Table {
ss << "Column " << i << " was null";
return Status::Invalid(ss.str());
}
+ if (!col->field()->Equals(*schema_->field(i))) {
+ std::stringstream ss;
+ ss << "Column field " << i << " named " << col->name()
+ << " is inconsistent with schema";
+ return Status::Invalid(ss.str());
+ }
+ }
+
+ // Make sure columns are all the same length
+ for (int i = 0; i < num_columns(); ++i) {
+ const Column* col = columns_[i].get();
if (col->length() != num_rows_) {
std::stringstream ss;
ss << "Column " << i << " named " << col->name() << " expected length "
diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h
index 32af224..7fa207f 100644
--- a/cpp/src/arrow/table.h
+++ b/cpp/src/arrow/table.h
@@ -69,6 +69,13 @@ class ARROW_EXPORT ChunkedArray {
/// \brief Slice from offset until end of the chunked array
std::shared_ptr<ChunkedArray> Slice(int64_t offset) const;
+ /// \brief Flatten this chunked array as a vector of chunked arrays, one
+ /// for each struct field
+ ///
+ /// \param[in] pool The pool for buffer allocations, if any
+ /// \param[out] out The resulting vector of arrays
+ Status Flatten(MemoryPool* pool, std::vector<std::shared_ptr<ChunkedArray>>*
out) const;
+
std::shared_ptr<DataType> type() const { return type_; }
bool Equals(const ChunkedArray& other) const;
@@ -133,6 +140,12 @@ class ARROW_EXPORT Column {
return std::make_shared<Column>(field_, data_->Slice(offset));
}
+ /// \brief Flatten this column as a vector of columns
+ ///
+ /// \param[in] pool The pool for buffer allocations, if any
+ /// \param[out] out The resulting vector of arrays
+ Status Flatten(MemoryPool* pool, std::vector<std::shared_ptr<Column>>* out)
const;
+
bool Equals(const Column& other) const;
bool Equals(const std::shared_ptr<Column>& other) const;
@@ -215,6 +228,13 @@ class ARROW_EXPORT Table {
virtual std::shared_ptr<Table> ReplaceSchemaMetadata(
const std::shared_ptr<const KeyValueMetadata>& metadata) const = 0;
+ /// \brief Flatten the table, producing a new Table. Any column with a
+ /// struct type will be flattened into multiple columns
+ ///
+ /// \param[in] pool The pool for buffer allocations, if any
+ /// \param[out] out The returned table
+ virtual Status Flatten(MemoryPool* pool, std::shared_ptr<Table>* out) const
= 0;
+
/// \brief Perform any checks to validate the input arguments
virtual Status Validate() const = 0;
diff --git a/cpp/src/arrow/type-test.cc b/cpp/src/arrow/type-test.cc
index f62d14d..e9245d5 100644
--- a/cpp/src/arrow/type-test.cc
+++ b/cpp/src/arrow/type-test.cc
@@ -86,6 +86,33 @@ TEST(TestField, TestRemoveMetadata) {
ASSERT_TRUE(f2->metadata() == nullptr);
}
+TEST(TestField, TestFlatten) {
+ auto metadata = std::shared_ptr<KeyValueMetadata>(
+ new KeyValueMetadata({"foo", "bar"}, {"bizz", "buzz"}));
+ auto f0 = field("f0", int32(), true /* nullable */, metadata);
+ auto vec = f0->Flatten();
+ ASSERT_EQ(vec.size(), 1);
+ ASSERT_TRUE(vec[0]->Equals(*f0));
+
+ auto f1 = field("f1", float64(), false /* nullable */);
+ auto ff = field("nest", struct_({f0, f1}));
+ vec = ff->Flatten();
+ ASSERT_EQ(vec.size(), 2);
+ auto expected0 = field("nest.f0", int32(), true /* nullable */, metadata);
+ // nullable parent implies nullable flattened child
+ auto expected1 = field("nest.f1", float64(), true /* nullable */);
+ ASSERT_TRUE(vec[0]->Equals(*expected0));
+ ASSERT_TRUE(vec[1]->Equals(*expected1));
+
+ ff = field("nest", struct_({f0, f1}), false /* nullable */);
+ vec = ff->Flatten();
+ ASSERT_EQ(vec.size(), 2);
+ expected0 = field("nest.f0", int32(), true /* nullable */, metadata);
+ expected1 = field("nest.f1", float64(), false /* nullable */);
+ ASSERT_TRUE(vec[0]->Equals(*expected0));
+ ASSERT_TRUE(vec[1]->Equals(*expected1));
+}
+
class TestSchema : public ::testing::Test {
public:
void SetUp() {}
diff --git a/cpp/src/arrow/type.cc b/cpp/src/arrow/type.cc
index 16e7585..2f6e718 100644
--- a/cpp/src/arrow/type.cc
+++ b/cpp/src/arrow/type.cc
@@ -42,6 +42,21 @@ std::shared_ptr<Field> Field::RemoveMetadata() const {
return std::make_shared<Field>(name_, type_, nullable_);
}
+std::vector<std::shared_ptr<Field>> Field::Flatten() const {
+ std::vector<std::shared_ptr<Field>> flattened;
+ if (type_->id() == Type::STRUCT) {
+ for (const auto& child : type_->children()) {
+ auto flattened_child = std::make_shared<Field>(*child);
+ flattened.push_back(flattened_child);
+ flattened_child->name_.insert(0, name() + ".");
+ flattened_child->nullable_ |= nullable_;
+ }
+ } else {
+ flattened.push_back(std::make_shared<Field>(*this));
+ }
+ return flattened;
+}
+
bool Field::Equals(const Field& other) const {
if (this == &other) {
return true;
diff --git a/cpp/src/arrow/type.h b/cpp/src/arrow/type.h
index 9cd1d8f..915c0c7 100644
--- a/cpp/src/arrow/type.h
+++ b/cpp/src/arrow/type.h
@@ -236,6 +236,8 @@ class ARROW_EXPORT Field {
const std::shared_ptr<const KeyValueMetadata>& metadata) const;
std::shared_ptr<Field> RemoveMetadata() const;
+ std::vector<std::shared_ptr<Field>> Flatten() const;
+
bool Equals(const Field& other) const;
bool Equals(const std::shared_ptr<Field>& other) const;
diff --git a/cpp/src/arrow/util/bit-util-test.cc
b/cpp/src/arrow/util/bit-util-test.cc
index ba4d8a3..c527244 100644
--- a/cpp/src/arrow/util/bit-util-test.cc
+++ b/cpp/src/arrow/util/bit-util-test.cc
@@ -22,6 +22,7 @@
#include <initializer_list>
#include <limits>
#include <memory>
+#include <valarray>
#include <vector>
#include <gtest/gtest.h>
@@ -92,7 +93,7 @@ void ASSERT_READER_VALUES(internal::BitmapReader& reader,
std::vector<int> value
// Assert equal contents of a memory area and a vector of bytes
void ASSERT_BYTES_EQ(const uint8_t* left, const std::vector<uint8_t>& right) {
auto left_array = std::vector<uint8_t>(left, left + right.size());
- ASSERT_EQ(std::vector<uint8_t>(std::begin(left_array),
std::end(left_array)), right);
+ ASSERT_EQ(left_array, right);
}
TEST(BitUtilTests, TestIsMultipleOf64) {
diff --git a/python/pyarrow/includes/libarrow.pxd
b/python/pyarrow/includes/libarrow.pxd
index 1dcff8a..12bbefb 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -258,6 +258,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CField] AddMetadata(
const shared_ptr[CKeyValueMetadata]& metadata)
shared_ptr[CField] RemoveMetadata()
+ vector[shared_ptr[CField]] Flatten()
cdef cppclass CStructType" arrow::StructType"(CDataType):
CStructType(const vector[shared_ptr[CField]]& fields)
@@ -437,6 +438,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
c_bool Equals(const CColumn& other)
+ CStatus Flatten(CMemoryPool* pool, vector[shared_ptr[CColumn]]* out)
+
shared_ptr[CField] field()
int64_t length()
@@ -495,6 +498,10 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CTable]* out)
CStatus RemoveColumn(int i, shared_ptr[CTable]* out)
+ CStatus Flatten(CMemoryPool* pool, shared_ptr[CTable]* out)
+
+ CStatus Validate()
+
shared_ptr[CTable] ReplaceSchemaMetadata(
const shared_ptr[CKeyValueMetadata]& metadata)
diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi
index a97fde2..c867657 100644
--- a/python/pyarrow/table.pxi
+++ b/python/pyarrow/table.pxi
@@ -271,13 +271,22 @@ cdef class Column:
def __repr__(self):
from pyarrow.compat import StringIO
result = StringIO()
- result.write(object.__repr__(self))
+ result.write('<Column name={0!r} type={1!r}>'
+ .format(self.name, self.type))
data = self.data
for i, chunk in enumerate(data.chunks):
result.write('\nchunk {0}: {1}'.format(i, repr(chunk)))
return result.getvalue()
+ def __richcmp__(Column self, Column other, int op):
+ if op == cp.Py_EQ:
+ return self.equals(other)
+ elif op == cp.Py_NE:
+ return not self.equals(other)
+ else:
+ raise TypeError('Invalid comparison')
+
@staticmethod
def from_array(*args):
return column(*args)
@@ -315,6 +324,29 @@ cdef class Column:
casted_data = pyarrow_wrap_chunked_array(out.chunked_array())
return column(self.name, casted_data)
+ def flatten(self, MemoryPool memory_pool=None):
+ """
+ Flatten this Column. If it has a struct type, the column is
+ flattened into one column per struct field.
+
+ Parameters
+ ----------
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required, otherwise use default pool
+
+ Returns
+ -------
+ result : List[Column]
+ """
+ cdef:
+ vector[shared_ptr[CColumn]] flattened
+ CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+
+ with nogil:
+ check_status(self.column.Flatten(pool, &flattened))
+
+ return [pyarrow_wrap_column(col) for col in flattened]
+
def to_pandas(self,
c_bool strings_to_categorical=False,
c_bool zero_copy_only=False,
@@ -843,6 +875,14 @@ cdef class Table:
)
return 0
+ def _validate(self):
+ """
+ Validate table consistency.
+ """
+ self._check_nullptr()
+ with nogil:
+ check_status(self.table.Validate())
+
def replace_schema_metadata(self, dict metadata=None):
"""
EXPERIMENTAL: Create shallow copy of table by replacing schema
@@ -867,6 +907,29 @@ cdef class Table:
return pyarrow_wrap_table(new_table)
+ def flatten(self, MemoryPool memory_pool=None):
+ """
+ Flatten this Table. Each column with a struct type is flattened
+ into one column per struct field. Other columns are left unchanged.
+
+ Parameters
+ ----------
+ memory_pool : MemoryPool, default None
+ For memory allocations, if required, otherwise use default pool
+
+ Returns
+ -------
+ result : Table
+ """
+ cdef:
+ shared_ptr[CTable] flattened
+ CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
+
+ with nogil:
+ check_status(self.table.Flatten(pool, &flattened))
+
+ return pyarrow_wrap_table(flattened)
+
def equals(self, Table other):
"""
Check if contents of two tables are equal
diff --git a/python/pyarrow/tests/test_schema.py
b/python/pyarrow/tests/test_schema.py
index 6f99c12..9570506 100644
--- a/python/pyarrow/tests/test_schema.py
+++ b/python/pyarrow/tests/test_schema.py
@@ -254,6 +254,26 @@ def test_field_add_remove_metadata():
assert f5.equals(f6)
+def test_field_flatten():
+ f0 = pa.field('foo', pa.int32()).add_metadata({b'foo': b'bar'})
+ assert f0.flatten() == [f0]
+
+ f1 = pa.field('bar', pa.float64(), nullable=False)
+ ff = pa.field('ff', pa.struct([f0, f1]), nullable=False)
+ assert ff.flatten() == [
+ pa.field('ff.foo', pa.int32()).add_metadata({b'foo': b'bar'}),
+ pa.field('ff.bar', pa.float64(), nullable=False)] # XXX
+
+ # Nullable parent makes flattened child nullable
+ ff = pa.field('ff', pa.struct([f0, f1]))
+ assert ff.flatten() == [
+ pa.field('ff.foo', pa.int32()).add_metadata({b'foo': b'bar'}),
+ pa.field('ff.bar', pa.float64())]
+
+ fff = pa.field('fff', pa.struct([ff]))
+ assert fff.flatten() == [pa.field('fff.ff', pa.struct([f0, f1]))]
+
+
def test_schema_add_remove_metadata():
fields = [
pa.field('foo', pa.int32()),
diff --git a/python/pyarrow/tests/test_table.py
b/python/pyarrow/tests/test_table.py
index 100f2b0..9a0a482 100644
--- a/python/pyarrow/tests/test_table.py
+++ b/python/pyarrow/tests/test_table.py
@@ -97,6 +97,23 @@ def test_column_to_pandas():
assert series.iloc[0] == -10
+def test_column_flatten():
+ ty = pa.struct([pa.field('x', pa.int16()),
+ pa.field('y', pa.float32())])
+ a = pa.array([(1, 2.5), (3, 4.5), (5, 6.5)], type=ty)
+ col = pa.Column.from_array('foo', a)
+ x, y = col.flatten()
+ assert x == pa.column('foo.x', pa.array([1, 3, 5], type=pa.int16()))
+ assert y == pa.column('foo.y', pa.array([2.5, 4.5, 6.5],
+ type=pa.float32()))
+ # Empty column
+ a = pa.array([], type=ty)
+ col = pa.Column.from_array('foo', a)
+ x, y = col.flatten()
+ assert x == pa.column('foo.x', pa.array([], type=pa.int16()))
+ assert y == pa.column('foo.y', pa.array([], type=pa.float32()))
+
+
def test_recordbatch_basics():
data = [
pa.array(range(5)),
@@ -269,6 +286,7 @@ def test_table_basics():
pa.array([-10, -5, 0, 5, 10])
]
table = pa.Table.from_arrays(data, names=('a', 'b'))
+ table._validate()
assert len(table) == 5
assert table.num_rows == 5
assert table.num_columns == 2
@@ -367,6 +385,7 @@ def test_table_remove_column():
table = pa.Table.from_arrays(data, names=('a', 'b', 'c'))
t2 = table.remove_column(0)
+ t2._validate()
expected = pa.Table.from_arrays(data[1:], names=('b', 'c'))
assert t2.equals(expected)
@@ -379,12 +398,34 @@ def test_table_remove_column_empty():
table = pa.Table.from_arrays(data, names=['a'])
t2 = table.remove_column(0)
+ t2._validate()
assert len(t2) == len(table)
t3 = t2.add_column(0, table[0])
+ t3._validate()
assert t3.equals(table)
+def test_table_flatten():
+ ty1 = pa.struct([pa.field('x', pa.int16()),
+ pa.field('y', pa.float32())])
+ ty2 = pa.struct([pa.field('nest', ty1)])
+ a = pa.array([(1, 2.5), (3, 4.5)], type=ty1)
+ b = pa.array([((11, 12.5),), ((13, 14.5),)], type=ty2)
+ c = pa.array([False, True], type=pa.bool_())
+
+ table = pa.Table.from_arrays([a, b, c], names=['a', 'b', 'c'])
+ t2 = table.flatten()
+ t2._validate()
+ expected = pa.Table.from_arrays([
+ pa.array([1, 3], type=pa.int16()),
+ pa.array([2.5, 4.5], type=pa.float32()),
+ pa.array([(11, 12.5), (13, 14.5)], type=ty1),
+ c],
+ names=['a.x', 'a.y', 'b.nest', 'c'])
+ assert t2.equals(expected)
+
+
def test_concat_tables():
data = [
list(range(5)),
@@ -401,6 +442,7 @@ def test_concat_tables():
names=('a', 'b'))
result = pa.concat_tables([t1, t2])
+ result._validate()
assert len(result) == 10
expected = pa.Table.from_arrays([pa.array(x + y)
diff --git a/python/pyarrow/types.pxi b/python/pyarrow/types.pxi
index 9cd9bed..2922dc2 100644
--- a/python/pyarrow/types.pxi
+++ b/python/pyarrow/types.pxi
@@ -487,6 +487,20 @@ cdef class Field:
new_field = self.field.RemoveMetadata()
return pyarrow_wrap_field(new_field)
+ def flatten(self):
+ """
+ Flatten this field. If a struct field, individual child fields
+ will be returned with their names prefixed by the parent's name.
+
+ Returns
+ -------
+ fields : List[pyarrow.Field]
+ """
+ cdef vector[shared_ptr[CField]] flattened
+ with nogil:
+ flattened = self.field.Flatten()
+ return [pyarrow_wrap_field(f) for f in flattened]
+
cdef class Schema:
--
To stop receiving notification emails like this one, please contact
[email protected].