Repository: arrow Updated Branches: refs/heads/master 6526a522d -> 3195948f6 (forced update)
ARROW-438: [C++/Python] Implement zero-data-copy record batch and table concatenation. This also fixes a bug in ChunkedArray::Equals. This is caught by the Python test suite but would benefit from more C++ unit tests. Author: Wes McKinney <[email protected]> Closes #274 from wesm/ARROW-438 and squashes the following commits: 1f39568 [Wes McKinney] py3 compatibility 2e76c5e [Wes McKinney] Implement arrow::ConcatenateTables and Python wrapper. Fix bug in ChunkedArray::Equals f3cb170 [Wes McKinney] Fix Cython compilation, verify pyarrow.Table.from_batches still works af28755 [Wes McKinney] Implement Table::FromRecordBatches Change-Id: I948b61d848c178edefad63465a74d9c303ad1f18 Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/3195948f Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/3195948f Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/3195948f Branch: refs/heads/master Commit: 3195948f64770520c7ed4c8a7888b33402ad6519 Parents: 1094d89 Author: Wes McKinney <[email protected]> Authored: Sun Jan 8 10:50:30 2017 -0500 Committer: Wes McKinney <[email protected]> Committed: Sun Jan 8 10:55:13 2017 -0500 ---------------------------------------------------------------------- cpp/CMakeLists.txt | 2 +- cpp/src/arrow/column.cc | 11 +- cpp/src/arrow/column.h | 2 + cpp/src/arrow/io/io-file-test.cc | 1 - cpp/src/arrow/table-test.cc | 88 ++++++++++++-- cpp/src/arrow/table.cc | 71 +++++++++++ cpp/src/arrow/table.h | 13 +- cpp/src/arrow/test-util.h | 43 ++++--- python/CMakeLists.txt | 3 + python/benchmarks/array.py | 7 +- python/doc/pandas.rst | 5 +- python/pyarrow/__init__.py | 2 +- python/pyarrow/array.pyx | 25 ++++ python/pyarrow/includes/libarrow.pxd | 16 +++ python/pyarrow/table.pyx | 147 ++++++++++++++++------- python/pyarrow/tests/test_convert_pandas.py | 6 +- python/pyarrow/tests/test_parquet.py | 12 +- python/pyarrow/tests/test_table.py | 27 +++++ python/setup.py | 5 +- 19 files changed, 395 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 3522e5c..87b7841 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -76,7 +76,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") option(ARROW_JEMALLOC "Build the Arrow jemalloc-based allocator" - ON) + OFF) option(ARROW_BOOST_USE_SHARED "Rely on boost shared libraries where relevant" http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/cpp/src/arrow/column.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/column.cc b/cpp/src/arrow/column.cc index 3e89956..9cc0f57 100644 --- a/cpp/src/arrow/column.cc +++ b/cpp/src/arrow/column.cc @@ -45,7 +45,9 @@ bool ChunkedArray::Equals(const ChunkedArray& other) const { int32_t this_start_idx = 0; int other_chunk_idx = 0; int32_t other_start_idx = 0; - while (this_chunk_idx < static_cast<int32_t>(chunks_.size())) { + + int64_t elements_compared = 0; + while (elements_compared < length_) { const std::shared_ptr<Array> this_array = chunks_[this_chunk_idx]; const std::shared_ptr<Array> other_array = other.chunk(other_chunk_idx); int32_t common_length = std::min( @@ -55,14 +57,21 @@ bool ChunkedArray::Equals(const ChunkedArray& other) const { return false; } + elements_compared += common_length; + // If we have exhausted the current chunk, proceed to the next one individually. if (this_start_idx + common_length == this_array->length()) { this_chunk_idx++; this_start_idx = 0; + } else { + this_start_idx += common_length; } + if (other_start_idx + common_length == other_array->length()) { other_chunk_idx++; other_start_idx = 0; + } else { + other_start_idx += common_length; } } return true; http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/cpp/src/arrow/column.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/column.h b/cpp/src/arrow/column.h index f716473..a28b266 100644 --- a/cpp/src/arrow/column.h +++ b/cpp/src/arrow/column.h @@ -48,6 +48,8 @@ class ARROW_EXPORT ChunkedArray { std::shared_ptr<Array> chunk(int i) const { return chunks_[i]; } + const ArrayVector& chunks() const { return chunks_; } + bool Equals(const ChunkedArray& other) const; bool Equals(const std::shared_ptr<ChunkedArray>& other) const; http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/cpp/src/arrow/io/io-file-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc index 378b60e..821e71d 100644 --- a/cpp/src/arrow/io/io-file-test.cc +++ b/cpp/src/arrow/io/io-file-test.cc @@ -301,7 +301,6 @@ class MyMemoryPool : public MemoryPool { return Status::OutOfMemory(ss.str()); } - return Status::OK(); } http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/cpp/src/arrow/table-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table-test.cc b/cpp/src/arrow/table-test.cc index 734b941..67c9f67 100644 --- a/cpp/src/arrow/table-test.cc +++ b/cpp/src/arrow/table-test.cc @@ -44,16 +44,20 @@ class TestTable : public TestBase { vector<shared_ptr<Field>> fields = {f0, f1, f2}; schema_ = std::make_shared<Schema>(fields); - columns_ = { - std::make_shared<Column>(schema_->field(0), MakePrimitive<Int32Array>(length)), - std::make_shared<Column>(schema_->field(1), MakePrimitive<UInt8Array>(length)), - std::make_shared<Column>(schema_->field(2), MakePrimitive<Int16Array>(length))}; + arrays_ = {MakePrimitive<Int32Array>(length), MakePrimitive<UInt8Array>(length), + MakePrimitive<Int16Array>(length)}; + + columns_ = {std::make_shared<Column>(schema_->field(0), arrays_[0]), + std::make_shared<Column>(schema_->field(1), arrays_[1]), + std::make_shared<Column>(schema_->field(2), arrays_[2])}; } protected: std::shared_ptr<Table> table_; shared_ptr<Schema> schema_; - vector<std::shared_ptr<Column>> columns_; + + std::vector<std::shared_ptr<Array>> arrays_; + std::vector<std::shared_ptr<Column>> columns_; }; TEST_F(TestTable, EmptySchema) { @@ -65,7 +69,7 @@ TEST_F(TestTable, EmptySchema) { } TEST_F(TestTable, Ctors) { - int length = 100; + const int length = 100; MakeExample1(length); std::string name = "data"; @@ -83,7 +87,7 @@ TEST_F(TestTable, Ctors) { } TEST_F(TestTable, Metadata) { - int length = 100; + const int length = 100; MakeExample1(length); std::string name = "data"; @@ -98,7 +102,7 @@ TEST_F(TestTable, Metadata) { TEST_F(TestTable, InvalidColumns) { // Check that columns are all the same length - int length = 100; + const int length = 100; MakeExample1(length); table_.reset(new Table("data", schema_, columns_, length - 1)); @@ -120,7 +124,7 @@ TEST_F(TestTable, InvalidColumns) { } TEST_F(TestTable, Equals) { - int length = 100; + const int length = 100; MakeExample1(length); std::string name = "data"; @@ -145,6 +149,72 @@ TEST_F(TestTable, Equals) { ASSERT_FALSE(table_->Equals(std::make_shared<Table>(name, schema_, other_columns))); } +TEST_F(TestTable, FromRecordBatches) { + const int32_t length = 10; + MakeExample1(length); + + auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_); + + std::shared_ptr<Table> result, expected; + ASSERT_OK(Table::FromRecordBatches("foo", {batch1}, &result)); + + expected = std::make_shared<Table>("foo", schema_, columns_); + ASSERT_TRUE(result->Equals(expected)); + + std::vector<std::shared_ptr<Column>> other_columns; + for (int i = 0; i < schema_->num_fields(); ++i) { + std::vector<std::shared_ptr<Array>> col_arrays = {arrays_[i], arrays_[i]}; + other_columns.push_back(std::make_shared<Column>(schema_->field(i), col_arrays)); + } + + ASSERT_OK(Table::FromRecordBatches("foo", {batch1, batch1}, &result)); + expected = std::make_shared<Table>("foo", schema_, other_columns); + ASSERT_TRUE(result->Equals(expected)); + + // Error states + std::vector<std::shared_ptr<RecordBatch>> empty_batches; + ASSERT_RAISES(Invalid, Table::FromRecordBatches("", empty_batches, &result)); + + std::vector<std::shared_ptr<Field>> fields = {schema_->field(0), schema_->field(1)}; + auto other_schema = std::make_shared<Schema>(fields); + + std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]}; + auto batch2 = std::make_shared<RecordBatch>(other_schema, length, other_arrays); + ASSERT_RAISES(Invalid, Table::FromRecordBatches("", {batch1, batch2}, &result)); +} + +TEST_F(TestTable, ConcatenateTables) { + const int32_t length = 10; + + MakeExample1(length); + auto batch1 = std::make_shared<RecordBatch>(schema_, length, arrays_); + + // generate different data + MakeExample1(length); + auto batch2 = std::make_shared<RecordBatch>(schema_, length, arrays_); + + std::shared_ptr<Table> t1, t2, t3, result, expected; + ASSERT_OK(Table::FromRecordBatches("foo", {batch1}, &t1)); + ASSERT_OK(Table::FromRecordBatches("foo", {batch2}, &t2)); + + ASSERT_OK(ConcatenateTables("bar", {t1, t2}, &result)); + ASSERT_OK(Table::FromRecordBatches("bar", {batch1, batch2}, &expected)); + ASSERT_TRUE(result->Equals(expected)); + + // Error states + std::vector<std::shared_ptr<Table>> empty_tables; + ASSERT_RAISES(Invalid, ConcatenateTables("", empty_tables, &result)); + + std::vector<std::shared_ptr<Field>> fields = {schema_->field(0), schema_->field(1)}; + auto other_schema = std::make_shared<Schema>(fields); + + std::vector<std::shared_ptr<Array>> other_arrays = {arrays_[0], arrays_[1]}; + auto batch3 = std::make_shared<RecordBatch>(other_schema, length, other_arrays); + ASSERT_OK(Table::FromRecordBatches("", {batch3}, &t3)); + + ASSERT_RAISES(Invalid, ConcatenateTables("foo", {t1, t3}, &result)); +} + class TestRecordBatch : public TestBase {}; TEST_F(TestRecordBatch, Equals) { http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/cpp/src/arrow/table.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index 45f672e..b3563ea 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -77,6 +77,77 @@ Table::Table(const std::string& name, const std::shared_ptr<Schema>& schema, const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows) : name_(name), schema_(schema), columns_(columns), num_rows_(num_rows) {} +Status Table::FromRecordBatches(const std::string& name, + const std::vector<std::shared_ptr<RecordBatch>>& batches, + std::shared_ptr<Table>* table) { + if (batches.size() == 0) { + return Status::Invalid("Must pass at least one record batch"); + } + + std::shared_ptr<Schema> schema = batches[0]->schema(); + + const int nbatches = static_cast<int>(batches.size()); + const int ncolumns = static_cast<int>(schema->num_fields()); + + for (int i = 1; i < nbatches; ++i) { + if (!batches[i]->schema()->Equals(schema)) { + std::stringstream ss; + ss << "Schema at index " << static_cast<int>(i) << " was different: \n" + << schema->ToString() << "\nvs\n" + << batches[i]->schema()->ToString(); + return Status::Invalid(ss.str()); + } + } + + std::vector<std::shared_ptr<Column>> columns(ncolumns); + std::vector<std::shared_ptr<Array>> column_arrays(nbatches); + + for (int i = 0; i < ncolumns; ++i) { + for (int j = 0; j < nbatches; ++j) { + column_arrays[j] = batches[j]->column(i); + } + columns[i] = std::make_shared<Column>(schema->field(i), column_arrays); + } + + *table = std::make_shared<Table>(name, schema, columns); + return Status::OK(); +} + +Status ConcatenateTables(const std::string& output_name, + const std::vector<std::shared_ptr<Table>>& tables, std::shared_ptr<Table>* table) { + if (tables.size() == 0) { return Status::Invalid("Must pass at least one table"); } + + std::shared_ptr<Schema> schema = tables[0]->schema(); + + const int ntables = static_cast<int>(tables.size()); + const int ncolumns = static_cast<int>(schema->num_fields()); + + for (int i = 1; i < ntables; ++i) { + if (!tables[i]->schema()->Equals(schema)) { + std::stringstream ss; + ss << "Schema at index " << static_cast<int>(i) << " was different: \n" + << schema->ToString() << "\nvs\n" + << tables[i]->schema()->ToString(); + return Status::Invalid(ss.str()); + } + } + + std::vector<std::shared_ptr<Column>> columns(ncolumns); + for (int i = 0; i < ncolumns; ++i) { + std::vector<std::shared_ptr<Array>> column_arrays; + for (int j = 0; j < ntables; ++j) { + const std::vector<std::shared_ptr<Array>>& chunks = + tables[j]->column(i)->data()->chunks(); + for (const auto& chunk : chunks) { + column_arrays.push_back(chunk); + } + } + columns[i] = std::make_shared<Column>(schema->field(i), column_arrays); + } + *table = std::make_shared<Table>(output_name, schema, columns); + return Status::OK(); +} + bool Table::Equals(const Table& other) const { if (name_ != other.name()) { return false; } if (!schema_->Equals(other.schema())) { return false; } http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/cpp/src/arrow/table.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 0f2418d..583847c 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -82,7 +82,13 @@ class ARROW_EXPORT Table { // same length as num_rows -- you can validate this using // Table::ValidateColumns Table(const std::string& name, const std::shared_ptr<Schema>& schema, - const std::vector<std::shared_ptr<Column>>& columns, int64_t num_rows); + const std::vector<std::shared_ptr<Column>>& columns, int64_t nubm_rows); + + // Construct table from RecordBatch, but only if all of the batch schemas are + // equal. Returns Status::Invalid if there is some problem + static Status FromRecordBatches(const std::string& name, + const std::vector<std::shared_ptr<RecordBatch>>& batches, + std::shared_ptr<Table>* table); // @returns: the table's name, if any (may be length 0) const std::string& name() const { return name_; } @@ -116,6 +122,11 @@ class ARROW_EXPORT Table { int64_t num_rows_; }; +// Construct table from multiple input tables. Return Status::Invalid if +// schemas are not equal +Status ARROW_EXPORT ConcatenateTables(const std::string& output_name, + const std::vector<std::shared_ptr<Table>>& tables, std::shared_ptr<Table>* table); + } // namespace arrow #endif // ARROW_TABLE_H http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/cpp/src/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index f2da824..b59809d 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -71,23 +71,6 @@ namespace arrow { -class TestBase : public ::testing::Test { - public: - void SetUp() { pool_ = default_memory_pool(); } - - template <typename ArrayType> - std::shared_ptr<Array> MakePrimitive(int32_t length, int32_t null_count = 0) { - auto data = std::make_shared<PoolBuffer>(pool_); - auto null_bitmap = std::make_shared<PoolBuffer>(pool_); - EXPECT_OK(data->Resize(length * sizeof(typename ArrayType::value_type))); - EXPECT_OK(null_bitmap->Resize(BitUtil::BytesForBits(length))); - return std::make_shared<ArrayType>(length, data, null_count, null_bitmap); - } - - protected: - MemoryPool* pool_; -}; - namespace test { template <typename T> @@ -253,6 +236,32 @@ Status MakeRandomBytePoolBuffer(int32_t length, MemoryPool* pool, } // namespace test +class TestBase : public ::testing::Test { + public: + void SetUp() { + pool_ = default_memory_pool(); + random_seed_ = 0; + } + + template <typename ArrayType> + std::shared_ptr<Array> MakePrimitive(int32_t length, int32_t null_count = 0) { + auto data = std::make_shared<PoolBuffer>(pool_); + const int64_t data_nbytes = length * sizeof(typename ArrayType::value_type); + EXPECT_OK(data->Resize(data_nbytes)); + + // Fill with random data + test::random_bytes(data_nbytes, random_seed_++, data->mutable_data()); + + auto null_bitmap = std::make_shared<PoolBuffer>(pool_); + EXPECT_OK(null_bitmap->Resize(BitUtil::BytesForBits(length))); + return std::make_shared<ArrayType>(length, data, null_count, null_bitmap); + } + + protected: + uint32_t random_seed_; + MemoryPool* pool_; +}; + template <typename TYPE, typename C_TYPE> void ArrayFromVector(const std::shared_ptr<DataType>& type, const std::vector<bool>& is_valid, const std::vector<C_TYPE>& values, http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/python/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 6c24772..e42c45d 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -70,6 +70,9 @@ include(SetupCxxFlags) # Add common flags set(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}") +# Suppress Cython warnings +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-variable") + # Determine compiler version include(CompilerInfo) http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/python/benchmarks/array.py ---------------------------------------------------------------------- diff --git a/python/benchmarks/array.py b/python/benchmarks/array.py index 4268f00..e22c0f7 100644 --- a/python/benchmarks/array.py +++ b/python/benchmarks/array.py @@ -49,10 +49,10 @@ class PandasConversionsToArrow(PandasConversionsBase): params = ((1, 10 ** 5, 10 ** 6, 10 ** 7), ('int64', 'float64', 'float64_nans', 'str')) def time_from_series(self, n, dtype): - A.from_pandas_dataframe(self.data) + A.Table.from_pandas(self.data) def peakmem_from_series(self, n, dtype): - A.from_pandas_dataframe(self.data) + A.Table.from_pandas(self.data) class PandasConversionsFromArrow(PandasConversionsBase): @@ -61,7 +61,7 @@ class PandasConversionsFromArrow(PandasConversionsBase): def setup(self, n, dtype): super(PandasConversionsFromArrow, self).setup(n, dtype) - self.arrow_data = A.from_pandas_dataframe(self.data) + self.arrow_data = A.Table.from_pandas(self.data) def time_to_series(self, n, dtype): self.arrow_data.to_pandas() @@ -80,4 +80,3 @@ class ScalarAccess(object): def time_as_py(self, n): for i in range(n): self._array[i].as_py() - http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/python/doc/pandas.rst ---------------------------------------------------------------------- diff --git a/python/doc/pandas.rst b/python/doc/pandas.rst index 7c70074..c225d13 100644 --- a/python/doc/pandas.rst +++ b/python/doc/pandas.rst @@ -31,7 +31,7 @@ represent more data than a DataFrame, so a full conversion is not always possibl Conversion from a Table to a DataFrame is done by calling :meth:`pyarrow.table.Table.to_pandas`. The inverse is then achieved by using -:meth:`pyarrow.from_pandas_dataframe`. This conversion routine provides the +:meth:`pyarrow.Table.from_pandas`. This conversion routine provides the convience parameter ``timestamps_to_ms``. Although Arrow supports timestamps of different resolutions, Pandas only supports nanosecond timestamps and most other systems (e.g. Parquet) only work on millisecond timestamps. This parameter @@ -45,7 +45,7 @@ conversion. df = pd.DataFrame({"a": [1, 2, 3]}) # Convert from Pandas to Arrow - table = pa.from_pandas_dataframe(df) + table = pa.Table.from_pandas(df) # Convert back to Pandas df_new = table.to_pandas() @@ -111,4 +111,3 @@ Arrow -> Pandas Conversion +-------------------------------------+--------------------------------------------------------+ | ``TIMESTAMP(unit=*)`` | ``pd.Timestamp`` (``np.datetime64[ns]``) | +-------------------------------------+--------------------------------------------------------+ - http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/python/pyarrow/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py index 02b2b06..d25cdd4 100644 --- a/python/pyarrow/__init__.py +++ b/python/pyarrow/__init__.py @@ -56,4 +56,4 @@ from pyarrow.schema import (null, bool_, list_, struct, field, DataType, Field, Schema, schema) -from pyarrow.table import Column, RecordBatch, Table, from_pandas_dataframe +from pyarrow.table import Column, RecordBatch, Table, concat_tables http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/python/pyarrow/array.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/array.pyx b/python/pyarrow/array.pyx index c178d5c..266768f 100644 --- a/python/pyarrow/array.pyx +++ b/python/pyarrow/array.pyx @@ -91,6 +91,29 @@ cdef class Array: """ return from_pandas_series(obj, mask) + @staticmethod + def from_list(object list_obj, DataType type=None): + """ + Convert Python list to Arrow array + + Parameters + ---------- + list_obj : array_like + + Returns + ------- + pyarrow.array.Array + """ + cdef: + shared_ptr[CArray] sp_array + + if type is None: + check_status(pyarrow.ConvertPySequence(list_obj, &sp_array)) + else: + raise NotImplementedError() + + return box_arrow_array(sp_array) + property null_count: def __get__(self): @@ -348,3 +371,5 @@ cdef object series_as_ndarray(object obj): result = obj return result + +from_pylist = Array.from_list http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/python/pyarrow/includes/libarrow.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 3cdfe49..b0f971d 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -182,6 +182,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CColumn(const shared_ptr[CField]& field, const vector[shared_ptr[CArray]]& chunks) + c_bool Equals(const CColumn& other) + c_bool Equals(const shared_ptr[CColumn]& other) + int64_t length() int64_t null_count() const c_string& name() @@ -207,14 +210,27 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil: CTable(const c_string& name, const shared_ptr[CSchema]& schema, const vector[shared_ptr[CColumn]]& columns) + @staticmethod + CStatus FromRecordBatches( + const c_string& name, + const vector[shared_ptr[CRecordBatch]]& batches, + shared_ptr[CTable]* table) + int num_columns() int num_rows() + c_bool Equals(const CTable& other) + c_bool Equals(const shared_ptr[CTable]& other) + const c_string& name() shared_ptr[CSchema] schema() shared_ptr[CColumn] column(int i) + CStatus ConcatenateTables(const c_string& output_name, + const vector[shared_ptr[CTable]]& tables, + shared_ptr[CTable]* result) + cdef extern from "arrow/ipc/metadata.h" namespace "arrow::ipc" nogil: cdef cppclass SchemaMessage: http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/python/pyarrow/table.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index 9255431..3a04651 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -155,6 +155,31 @@ cdef class Column: return pd.Series(PyObject_to_object(arr), name=self.name) + def equals(self, Column other): + """ + Check if contents of two columns are equal + + Parameters + ---------- + other : pyarrow.Column + + Returns + ------- + are_equal : boolean + """ + cdef: + CColumn* my_col = self.column + CColumn* other_col = other.column + c_bool result + + self._check_nullptr() + other._check_nullptr() + + with nogil: + result = my_col.Equals(deref(other_col)) + + return result + def to_pylist(self): """ Convert to a list of native Python objects. @@ -343,10 +368,18 @@ cdef class RecordBatch: return arr def equals(self, RecordBatch other): + cdef: + CRecordBatch* my_batch = self.batch + CRecordBatch* other_batch = other.batch + c_bool result + self._check_nullptr() other._check_nullptr() - return self.batch.Equals(deref(other.batch)) + with nogil: + result = my_batch.Equals(deref(other_batch)) + + return result def to_pydict(self): """ @@ -424,7 +457,6 @@ cdef class RecordBatch: """ cdef: Array arr - RecordBatch result c_string c_name shared_ptr[CSchema] schema shared_ptr[CRecordBatch] batch @@ -442,11 +474,7 @@ cdef class RecordBatch: c_arrays.push_back(arr.sp_array) batch.reset(new CRecordBatch(schema, num_rows, c_arrays)) - - result = RecordBatch() - result.init(batch) - - return result + return batch_from_cbatch(batch) cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads): @@ -498,6 +526,31 @@ cdef class Table: raise ReferenceError("Table object references a NULL pointer." "Not initialized.") + def equals(self, Table other): + """ + Check if contents of two tables are equal + + Parameters + ---------- + other : pyarrow.Table + + Returns + ------- + are_equal : boolean + """ + cdef: + CTable* my_table = self.table + CTable* other_table = other.table + c_bool result + + self._check_nullptr() + other._check_nullptr() + + with nogil: + result = my_table.Equals(deref(other_table)) + + return result + @classmethod def from_pandas(cls, df, name=None, timestamps_to_ms=False): """ @@ -527,7 +580,7 @@ cdef class Table: ... 'int': [1, 2], ... 'str': ['a', 'b'] ... }) - >>> pa.table.from_pandas_dataframe(df) + >>> pa.Table.from_pandas(df) <pyarrow.table.Table object at 0x7f05d1fb1b40> """ names, arrays = _dataframe_to_arrays(df, name=name, @@ -559,7 +612,6 @@ cdef class Table: c_string c_name vector[shared_ptr[CField]] fields vector[shared_ptr[CColumn]] columns - Table result shared_ptr[CSchema] schema shared_ptr[CTable] table @@ -577,14 +629,10 @@ cdef class Table: c_name = tobytes(name) table.reset(new CTable(c_name, schema, columns)) - - result = Table() - result.init(table) - - return result + return table_from_ctable(table) @staticmethod - def from_batches(batches): + def from_batches(batches, name=None): """ Construct a Table from a list of Arrow RecordBatches @@ -594,39 +642,21 @@ cdef class Table: batches: list of RecordBatch RecordBatch list to be converted, schemas must be equal """ - cdef: - vector[shared_ptr[CArray]] c_array_chunks - vector[shared_ptr[CColumn]] c_columns + vector[shared_ptr[CRecordBatch]] c_batches shared_ptr[CTable] c_table - Array arr - Schema schema - - import pandas as pd + RecordBatch batch + Table table + c_string c_name - schema = batches[0].schema + c_name = b'' if name is None else tobytes(name) - # check schemas are equal - for other in batches[1:]: - if not schema.equals(other.schema): - raise ArrowException("Error converting list of RecordBatches " - "to DataFrame, not all schemas are equal: {%s} != {%s}" - % (str(schema), str(other.schema))) + for batch in batches: + c_batches.push_back(batch.sp_batch) - cdef int K = batches[0].num_columns + with nogil: + check_status(CTable.FromRecordBatches(c_name, c_batches, &c_table)) - # create chunked columns from the batches - c_columns.resize(K) - for i in range(K): - for batch in batches: - arr = batch[i] - c_array_chunks.push_back(arr.sp_array) - c_columns[i].reset(new CColumn(schema.sp_schema.get().field(i), - c_array_chunks)) - c_array_chunks.clear() - - # create a Table from columns and convert to DataFrame - c_table.reset(new CTable('', schema.sp_schema, c_columns)) table = Table() table.init(c_table) return table @@ -760,9 +790,40 @@ cdef class Table: return (self.num_rows, self.num_columns) +def concat_tables(tables, output_name=None): + """ + Perform zero-copy concatenation of pyarrow.Table objects. Raises exception + if all of the Table schemas are not the same + + Parameters + ---------- + tables : iterable of pyarrow.Table objects + output_name : string, default None + A name for the output table, if any + """ + cdef: + vector[shared_ptr[CTable]] c_tables + shared_ptr[CTable] c_result + Table table + c_string c_name + + c_name = b'' if output_name is None else tobytes(output_name) + + for table in tables: + c_tables.push_back(table.sp_table) + + with nogil: + check_status(ConcatenateTables(c_name, c_tables, &c_result)) + + return table_from_ctable(c_result) + + cdef api object table_from_ctable(const shared_ptr[CTable]& ctable): cdef Table table = Table() table.init(ctable) return table -from_pandas_dataframe = Table.from_pandas +cdef api object batch_from_cbatch(const shared_ptr[CRecordBatch]& cbatch): + cdef RecordBatch batch = RecordBatch() + batch.init(cbatch) + return batch http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/python/pyarrow/tests/test_convert_pandas.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_convert_pandas.py b/python/pyarrow/tests/test_convert_pandas.py index bb9f0b3..12e7a08 100644 --- a/python/pyarrow/tests/test_convert_pandas.py +++ b/python/pyarrow/tests/test_convert_pandas.py @@ -61,7 +61,7 @@ class TestPandasConversion(unittest.TestCase): def _check_pandas_roundtrip(self, df, expected=None, nthreads=1, timestamps_to_ms=False): - table = A.from_pandas_dataframe(df, timestamps_to_ms=timestamps_to_ms) + table = A.Table.from_pandas(df, timestamps_to_ms=timestamps_to_ms) result = table.to_pandas(nthreads=nthreads) if expected is None: expected = df @@ -193,7 +193,7 @@ class TestPandasConversion(unittest.TestCase): values = [u('qux'), b'foo', None, 'bar', 'qux', np.nan] df = pd.DataFrame({'strings': values}) - table = A.from_pandas_dataframe(df) + table = A.Table.from_pandas(df) assert table[0].type == A.binary() values2 = [b'qux', b'foo', None, b'bar', b'qux', np.nan] @@ -245,7 +245,7 @@ class TestPandasConversion(unittest.TestCase): None, datetime.date(1970, 1, 1), datetime.date(2040, 2, 26)]}) - table = A.from_pandas_dataframe(df) + table = A.Table.from_pandas(df) result = table.to_pandas() expected = df.copy() expected['date'] = pd.to_datetime(df['date']) http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 7c45732..0fb913c 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -79,7 +79,7 @@ def test_pandas_parquet_2_0_rountrip(tmpdir): 'empty_str': [''] * size }) filename = tmpdir.join('pandas_rountrip.parquet') - arrow_table = A.from_pandas_dataframe(df, timestamps_to_ms=True) + arrow_table = A.Table.from_pandas(df, timestamps_to_ms=True) A.parquet.write_table(arrow_table, filename.strpath, version="2.0") table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() @@ -107,7 +107,7 @@ def test_pandas_parquet_1_0_rountrip(tmpdir): 'empty_str': [''] * size }) filename = tmpdir.join('pandas_rountrip.parquet') - arrow_table = A.from_pandas_dataframe(df) + arrow_table = A.Table.from_pandas(df) A.parquet.write_table(arrow_table, filename.strpath, version="1.0") table_read = pq.read_table(filename.strpath) df_read = table_read.to_pandas() @@ -126,7 +126,7 @@ def test_pandas_column_selection(tmpdir): 'uint16': np.arange(size, dtype=np.uint16) }) filename = tmpdir.join('pandas_rountrip.parquet') - arrow_table = A.from_pandas_dataframe(df) + arrow_table = A.Table.from_pandas(df) A.parquet.write_table(arrow_table, filename.strpath) table_read = pq.read_table(filename.strpath, columns=['uint8']) df_read = table_read.to_pandas() @@ -155,7 +155,7 @@ def _test_dataframe(size=10000): @parquet def test_pandas_parquet_native_file_roundtrip(tmpdir): df = _test_dataframe(10000) - arrow_table = A.from_pandas_dataframe(df) + arrow_table = A.Table.from_pandas(df) imos = paio.InMemoryOutputStream() pq.write_table(arrow_table, imos, version="2.0") buf = imos.get_result() @@ -176,7 +176,7 @@ def test_pandas_parquet_pyfile_roundtrip(tmpdir): 'strings': ['foo', 'bar', None, 'baz', 'qux'] }) - arrow_table = A.from_pandas_dataframe(df) + arrow_table = A.Table.from_pandas(df) with open(filename, 'wb') as f: A.parquet.write_table(arrow_table, f, version="1.0") @@ -206,7 +206,7 @@ def test_pandas_parquet_configuration_options(tmpdir): 'bool': np.random.randn(size) > 0 }) filename = tmpdir.join('pandas_rountrip.parquet') - arrow_table = A.from_pandas_dataframe(df) + arrow_table = A.Table.from_pandas(df) for use_dictionary in [True, False]: A.parquet.write_table( http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/python/pyarrow/tests/test_table.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_table.py b/python/pyarrow/tests/test_table.py index 9985b3e..6f00c73 100644 --- a/python/pyarrow/tests/test_table.py +++ b/python/pyarrow/tests/test_table.py @@ -111,6 +111,33 @@ def test_table_basics(): assert chunk is not None +def test_concat_tables(): + data = [ + list(range(5)), + [-10., -5., 0., 5., 10.] + ] + data2 = [ + list(range(5, 10)), + [1., 2., 3., 4., 5.] + ] + + t1 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x) + for x in data], 'table_name') + t2 = pa.Table.from_arrays(('a', 'b'), [pa.from_pylist(x) + for x in data2], 'table_name') + + result = pa.concat_tables([t1, t2], output_name='foo') + assert result.name == 'foo' + assert len(result) == 10 + + expected = pa.Table.from_arrays( + ('a', 'b'), [pa.from_pylist(x + y) + for x, y in zip(data, data2)], + 'foo') + + assert result.equals(expected) + + def test_table_pandas(): data = [ pa.from_pylist(range(5)), http://git-wip-us.apache.org/repos/asf/arrow/blob/3195948f/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index 2e595e2..3829a79 100644 --- a/python/setup.py +++ b/python/setup.py @@ -143,7 +143,10 @@ class build_ext(_build_ext): cmake_options + [source]) self.spawn(cmake_command) - args = ['make', 'VERBOSE=1'] + args = ['make'] + if os.environ.get('PYARROW_BUILD_VERBOSE', '0') == '1': + args.append('VERBOSE=1') + if 'PYARROW_PARALLEL' in os.environ: args.append('-j{0}'.format(os.environ['PYARROW_PARALLEL'])) self.spawn(args)
