Repository: arrow Updated Branches: refs/heads/master f7f915d90 -> d99958dd3
ARROW-452: [C++/Python] Incorporate C++ and Python codebases for Feather file format The goal for this patch is to provide an eventual migration path for Feather (https://github.com/wesm/feather) users to use the batch and streaming Arrow file formats internally. Eventually the Feather metadata can be deprecated, but we will need to wait for the R community to build and ship Arrow bindings for R before that can happen. In the meantime, we won't need to maintain multiple Python/C++ codebases for the Python side of things. The test suite isn't yet passing because support for timestamps with time zones has not been implemented in the conversion to pandas.DataFrame, so I will do that when I can, but this can be reviewed in the meantime. I would upload a Gerrit code review, but there are some access control settings on gerrit.cloudera.org that need changing. Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #361 from wesm/ARROW-452 and squashes the following commits: b7bfd30 [Wes McKinney] Add missing license header 06cbdca [Wes McKinney] Fix -Wconversion error 244959c [Wes McKinney] Mark datetime+tz tests as xfail 9a95094 [Wes McKinney] Incorporate Feather C++ and Python codebases and do associated refactoring to maximize code reuse with IPC reader/writer classes. Get C++ test suite passing. Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/d99958dd Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/d99958dd Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/d99958dd Branch: refs/heads/master Commit: d99958dd3de0ac4fd6a99127d62657249c494448 Parents: f7f915d Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Sat Mar 11 16:58:45 2017 -0500 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Sat Mar 11 16:58:45 2017 -0500 ---------------------------------------------------------------------- cpp/src/arrow/ipc/CMakeLists.txt | 22 +- cpp/src/arrow/ipc/api.h | 1 + cpp/src/arrow/ipc/feather-internal.h | 232 +++++++++ cpp/src/arrow/ipc/feather-test.cc | 437 +++++++++++++++++ cpp/src/arrow/ipc/feather.cc | 729 +++++++++++++++++++++++++++++ cpp/src/arrow/ipc/feather.fbs | 147 ++++++ cpp/src/arrow/ipc/feather.h | 109 +++++ python/CMakeLists.txt | 1 + python/pyarrow/_feather.pyx | 158 +++++++ python/pyarrow/feather.py | 118 +++++ python/pyarrow/table.pyx | 5 + python/pyarrow/tests/test_feather.py | 379 +++++++++++++++ python/setup.py | 1 + python/src/pyarrow/adapters/pandas.cc | 83 +++- 14 files changed, 2394 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/cpp/src/arrow/ipc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index 08da0a1..09a959b 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -25,11 +25,12 @@ set(ARROW_IPC_SHARED_LINK_LIBS ) set(ARROW_IPC_TEST_LINK_LIBS - arrow_io_static - arrow_ipc_static) + arrow_ipc_static + arrow_io_static) set(ARROW_IPC_SRCS adapter.cc + feather.cc json.cc json-internal.cc metadata.cc @@ -59,6 +60,10 @@ if(FLATBUFFERS_VENDORED) add_dependencies(arrow_ipc_objlib flatbuffers_ep) endif() +ADD_ARROW_TEST(feather-test) +ARROW_TEST_LINK_LIBRARIES(feather-test + ${ARROW_IPC_TEST_LINK_LIBS}) + ADD_ARROW_TEST(ipc-adapter-test) ARROW_TEST_LINK_LIBRARIES(ipc-adapter-test ${ARROW_IPC_TEST_LINK_LIBS}) @@ -105,14 +110,20 @@ if (ARROW_BUILD_TESTS) endif() # make clean will delete the generated file -set_source_files_properties(Metadata_generated.h PROPERTIES GENERATED TRUE) +set_source_files_properties(Message_generated.h PROPERTIES GENERATED TRUE) +set_source_files_properties(feather_generated.h PROPERTIES GENERATED TRUE) +set_source_files_properties(File_generated.h PROPERTIES GENERATED TRUE) set(OUTPUT_DIR ${CMAKE_SOURCE_DIR}/src/arrow/ipc) -set(FBS_OUTPUT_FILES "${OUTPUT_DIR}/Message_generated.h") +set(FBS_OUTPUT_FILES + "${OUTPUT_DIR}/File_generated.h" + "${OUTPUT_DIR}/Message_generated.h" + "${OUTPUT_DIR}/feather_generated.h") set(FBS_SRC ${CMAKE_SOURCE_DIR}/../format/Message.fbs - ${CMAKE_SOURCE_DIR}/../format/File.fbs) + ${CMAKE_SOURCE_DIR}/../format/File.fbs + ${CMAKE_CURRENT_SOURCE_DIR}/feather.fbs) foreach(FIL ${FBS_SRC}) get_filename_component(ABS_FIL ${FIL} ABSOLUTE) @@ -139,6 +150,7 @@ add_dependencies(arrow_ipc_objlib metadata_fbs) install(FILES adapter.h api.h + feather.h json.h metadata.h reader.h http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/cpp/src/arrow/ipc/api.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/api.h b/cpp/src/arrow/ipc/api.h index cb85421..ad7cd84 100644 --- a/cpp/src/arrow/ipc/api.h +++ b/cpp/src/arrow/ipc/api.h @@ -19,6 +19,7 @@ #define ARROW_IPC_API_H #include "arrow/ipc/adapter.h" +#include "arrow/ipc/feather.h" #include "arrow/ipc/json.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/reader.h" http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/cpp/src/arrow/ipc/feather-internal.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/feather-internal.h b/cpp/src/arrow/ipc/feather-internal.h new file mode 100644 index 0000000..10b0cfd --- /dev/null +++ b/cpp/src/arrow/ipc/feather-internal.h @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Public API for the "Feather" file format, originally created at +/// http://github.com/wesm/feather + +#ifndef ARROW_IPC_FEATHER_INTERNAL_H +#define ARROW_IPC_FEATHER_INTERNAL_H + +#include <cstdint> +#include <memory> +#include <string> +#include <vector> + +#include "flatbuffers/flatbuffers.h" + +#include "arrow/buffer.h" +#include "arrow/ipc/feather.h" +#include "arrow/ipc/feather_generated.h" +#include "arrow/type.h" + +namespace arrow { +namespace ipc { +namespace feather { + +typedef std::vector<flatbuffers::Offset<fbs::Column>> ColumnVector; +typedef flatbuffers::FlatBufferBuilder FBB; +typedef flatbuffers::Offset<flatbuffers::String> FBString; + +struct ColumnType { + enum type { PRIMITIVE, CATEGORY, TIMESTAMP, DATE, TIME }; +}; + +struct ArrayMetadata { + ArrayMetadata() {} + + ArrayMetadata(fbs::Type type, int64_t offset, int64_t length, int64_t null_count, + int64_t total_bytes) + : type(type), + offset(offset), + length(length), + null_count(null_count), + total_bytes(total_bytes) {} + + bool Equals(const ArrayMetadata& other) const { + return this->type == other.type && this->offset == other.offset && + this->length == other.length && this->null_count == other.null_count && + this->total_bytes == other.total_bytes; + } + + fbs::Type type; + int64_t offset; + int64_t length; + int64_t null_count; + int64_t total_bytes; +}; + +struct CategoryMetadata { + ArrayMetadata levels; + bool ordered; +}; + +struct TimestampMetadata { + TimeUnit unit; + + // A timezone name known to the Olson timezone database. For display purposes + // because the actual data is all UTC + std::string timezone; +}; + +struct TimeMetadata { + TimeUnit unit; +}; + +static constexpr const char* kFeatherMagicBytes = "FEA1"; +static constexpr const int kFeatherDefaultAlignment = 8; + +class ColumnBuilder; + +class TableBuilder { + public: + explicit TableBuilder(int64_t num_rows); + ~TableBuilder() = default; + + FBB& fbb(); + Status Finish(); + std::shared_ptr<Buffer> GetBuffer() const; + + std::unique_ptr<ColumnBuilder> AddColumn(const std::string& name); + void SetDescription(const std::string& description); + void SetNumRows(int64_t num_rows); + void add_column(const flatbuffers::Offset<fbs::Column>& col); + + private: + flatbuffers::FlatBufferBuilder fbb_; + ColumnVector columns_; + + friend class ColumnBuilder; + + bool finished_; + std::string description_; + int64_t num_rows_; +}; + +class TableMetadata { + public: + TableMetadata() {} + ~TableMetadata() = default; + + Status Open(const std::shared_ptr<Buffer>& buffer) { + metadata_buffer_ = buffer; + table_ = fbs::GetCTable(buffer->data()); + + if (table_->version() < kFeatherVersion) { + std::cout << "This Feather file is old" + << " and will not be readable beyond the 0.3.0 release" << std::endl; + } + return Status::OK(); + } + + bool HasDescription() const { return table_->description() != 0; } + + std::string GetDescription() const { + if (!HasDescription()) { return std::string(""); } + return table_->description()->str(); + } + + int version() const { return table_->version(); } + int64_t num_rows() const { return table_->num_rows(); } + int64_t num_columns() const { return table_->columns()->size(); } + + const fbs::Column* column(int i) { return table_->columns()->Get(i); } + + private: + std::shared_ptr<Buffer> metadata_buffer_; + const fbs::CTable* table_; +}; + +static inline flatbuffers::Offset<fbs::PrimitiveArray> GetPrimitiveArray( + FBB& fbb, const ArrayMetadata& array) { + return fbs::CreatePrimitiveArray(fbb, array.type, fbs::Encoding_PLAIN, array.offset, + array.length, array.null_count, array.total_bytes); +} + +static inline fbs::TimeUnit ToFlatbufferEnum(TimeUnit unit) { + return static_cast<fbs::TimeUnit>(static_cast<int>(unit)); +} + +static inline TimeUnit FromFlatbufferEnum(fbs::TimeUnit unit) { + return static_cast<TimeUnit>(static_cast<int>(unit)); +} + +// Convert Feather enums to Flatbuffer enums + +const fbs::TypeMetadata COLUMN_TYPE_ENUM_MAPPING[] = { + fbs::TypeMetadata_NONE, // PRIMITIVE + fbs::TypeMetadata_CategoryMetadata, // CATEGORY + fbs::TypeMetadata_TimestampMetadata, // TIMESTAMP + fbs::TypeMetadata_DateMetadata, // DATE + fbs::TypeMetadata_TimeMetadata // TIME +}; + +static inline fbs::TypeMetadata ToFlatbufferEnum(ColumnType::type column_type) { + return COLUMN_TYPE_ENUM_MAPPING[column_type]; +} + +static inline void FromFlatbuffer(const fbs::PrimitiveArray* values, ArrayMetadata* out) { + out->type = values->type(); + out->offset = values->offset(); + out->length = values->length(); + out->null_count = values->null_count(); + out->total_bytes = values->total_bytes(); +} + +class ColumnBuilder { + public: + ColumnBuilder(TableBuilder* parent, const std::string& name); + ~ColumnBuilder() = default; + + flatbuffers::Offset<void> CreateColumnMetadata(); + + Status Finish(); + void SetValues(const ArrayMetadata& values); + void SetUserMetadata(const std::string& data); + void SetCategory(const ArrayMetadata& levels, bool ordered = false); + void SetTimestamp(TimeUnit unit); + void SetTimestamp(TimeUnit unit, const std::string& timezone); + void SetDate(); + void SetTime(TimeUnit unit); + FBB& fbb(); + + private: + TableBuilder* parent_; + + std::string name_; + ArrayMetadata values_; + std::string user_metadata_; + + // Column metadata + + // Is this a primitive type, or one of the types having metadata? Default is + // primitive + ColumnType::type type_; + + // Type-specific metadata union + CategoryMetadata meta_category_; + TimeMetadata meta_time_; + + TimestampMetadata meta_timestamp_; + + FBB* fbb_; +}; + +} // namespace feather +} // namespace ipc +} // namespace arrow + +#endif // ARROW_IPC_FEATHER_INTERNAL_H http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/cpp/src/arrow/ipc/feather-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/feather-test.cc b/cpp/src/arrow/ipc/feather-test.cc new file mode 100644 index 0000000..b73246b --- /dev/null +++ b/cpp/src/arrow/ipc/feather-test.cc @@ -0,0 +1,437 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <cstdint> +#include <memory> +#include <random> +#include <sstream> +#include <string> +#include <vector> + +#include "gtest/gtest.h" + +#include "arrow/io/memory.h" +#include "arrow/ipc/feather-internal.h" +#include "arrow/ipc/feather.h" +#include "arrow/ipc/test-common.h" +#include "arrow/loader.h" +#include "arrow/pretty_print.h" +#include "arrow/test-util.h" + +namespace arrow { +namespace ipc { +namespace feather { + +template <typename T> +inline void assert_vector_equal(const std::vector<T>& left, const std::vector<T>& right) { + ASSERT_EQ(left.size(), right.size()); + + for (size_t i = 0; i < left.size(); ++i) { + ASSERT_EQ(left[i], right[i]) << i; + } +} + +class TestTableBuilder : public ::testing::Test { + public: + void SetUp() { tb_.reset(new TableBuilder(1000)); } + + virtual void Finish() { + tb_->Finish(); + + table_.reset(new TableMetadata()); + ASSERT_OK(table_->Open(tb_->GetBuffer())); + } + + protected: + std::unique_ptr<TableBuilder> tb_; + std::unique_ptr<TableMetadata> table_; +}; + +TEST_F(TestTableBuilder, Version) { + Finish(); + ASSERT_EQ(kFeatherVersion, table_->version()); +} + +TEST_F(TestTableBuilder, EmptyTable) { + Finish(); + + ASSERT_FALSE(table_->HasDescription()); + ASSERT_EQ("", table_->GetDescription()); + ASSERT_EQ(1000, table_->num_rows()); + ASSERT_EQ(0, table_->num_columns()); +} + +TEST_F(TestTableBuilder, SetDescription) { + std::string desc("this is some good data"); + tb_->SetDescription(desc); + Finish(); + ASSERT_TRUE(table_->HasDescription()); + ASSERT_EQ(desc, table_->GetDescription()); +} + +void AssertArrayEquals(const ArrayMetadata& left, const ArrayMetadata& right) { + EXPECT_EQ(left.type, right.type); + EXPECT_EQ(left.offset, right.offset); + EXPECT_EQ(left.length, right.length); + EXPECT_EQ(left.null_count, right.null_count); + EXPECT_EQ(left.total_bytes, right.total_bytes); +} + +TEST_F(TestTableBuilder, AddPrimitiveColumn) { + std::unique_ptr<ColumnBuilder> cb = tb_->AddColumn("f0"); + + ArrayMetadata values1; + ArrayMetadata values2; + values1.type = fbs::Type_INT32; + values1.offset = 10000; + values1.length = 1000; + values1.null_count = 100; + values1.total_bytes = 4000; + + cb->SetValues(values1); + + std::string user_meta = "as you wish"; + cb->SetUserMetadata(user_meta); + + cb->Finish(); + + cb = tb_->AddColumn("f1"); + + values2.type = fbs::Type_UTF8; + values2.offset = 14000; + values2.length = 1000; + values2.null_count = 100; + values2.total_bytes = 10000; + + cb->SetValues(values2); + cb->Finish(); + + Finish(); + + ASSERT_EQ(2, table_->num_columns()); + + auto col = table_->column(0); + + ASSERT_EQ("f0", col->name()->str()); + ASSERT_EQ(user_meta, col->user_metadata()->str()); + + ArrayMetadata values3; + FromFlatbuffer(col->values(), &values3); + AssertArrayEquals(values3, values1); + + col = table_->column(1); + ASSERT_EQ("f1", col->name()->str()); + + ArrayMetadata values4; + FromFlatbuffer(col->values(), &values4); + AssertArrayEquals(values4, values2); +} + +TEST_F(TestTableBuilder, AddCategoryColumn) { + ArrayMetadata values1(fbs::Type_UINT8, 10000, 1000, 100, 4000); + ArrayMetadata levels(fbs::Type_UTF8, 14000, 10, 0, 300); + + std::unique_ptr<ColumnBuilder> cb = tb_->AddColumn("c0"); + cb->SetValues(values1); + cb->SetCategory(levels); + cb->Finish(); + + cb = tb_->AddColumn("c1"); + cb->SetValues(values1); + cb->SetCategory(levels, true); + cb->Finish(); + + Finish(); + + auto col = table_->column(0); + ASSERT_EQ(fbs::TypeMetadata_CategoryMetadata, col->metadata_type()); + + ArrayMetadata result; + FromFlatbuffer(col->values(), &result); + AssertArrayEquals(result, values1); + + auto cat_ptr = static_cast<const fbs::CategoryMetadata*>(col->metadata()); + ASSERT_FALSE(cat_ptr->ordered()); + + FromFlatbuffer(cat_ptr->levels(), &result); + AssertArrayEquals(result, levels); + + col = table_->column(1); + cat_ptr = static_cast<const fbs::CategoryMetadata*>(col->metadata()); + ASSERT_TRUE(cat_ptr->ordered()); + FromFlatbuffer(cat_ptr->levels(), &result); + AssertArrayEquals(result, levels); +} + +TEST_F(TestTableBuilder, AddTimestampColumn) { + ArrayMetadata values1(fbs::Type_INT64, 10000, 1000, 100, 4000); + std::unique_ptr<ColumnBuilder> cb = tb_->AddColumn("c0"); + cb->SetValues(values1); + cb->SetTimestamp(TimeUnit::MILLI); + cb->Finish(); + + cb = tb_->AddColumn("c1"); + + std::string tz("America/Los_Angeles"); + + cb->SetValues(values1); + cb->SetTimestamp(TimeUnit::SECOND, tz); + cb->Finish(); + + Finish(); + + auto col = table_->column(0); + + ASSERT_EQ(fbs::TypeMetadata_TimestampMetadata, col->metadata_type()); + + ArrayMetadata result; + FromFlatbuffer(col->values(), &result); + AssertArrayEquals(result, values1); + + auto ts_ptr = static_cast<const fbs::TimestampMetadata*>(col->metadata()); + ASSERT_EQ(fbs::TimeUnit_MILLISECOND, ts_ptr->unit()); + + col = table_->column(1); + ts_ptr = static_cast<const fbs::TimestampMetadata*>(col->metadata()); + ASSERT_EQ(fbs::TimeUnit_SECOND, ts_ptr->unit()); + ASSERT_EQ(tz, ts_ptr->timezone()->str()); +} + +TEST_F(TestTableBuilder, AddDateColumn) { + ArrayMetadata values1(fbs::Type_INT64, 10000, 1000, 100, 4000); + std::unique_ptr<ColumnBuilder> cb = tb_->AddColumn("d0"); + cb->SetValues(values1); + cb->SetDate(); + cb->Finish(); + + Finish(); + + auto col = table_->column(0); + + ASSERT_EQ(fbs::TypeMetadata_DateMetadata, col->metadata_type()); + ArrayMetadata result; + FromFlatbuffer(col->values(), &result); + AssertArrayEquals(result, values1); +} + +TEST_F(TestTableBuilder, AddTimeColumn) { + ArrayMetadata values1(fbs::Type_INT64, 10000, 1000, 100, 4000); + std::unique_ptr<ColumnBuilder> cb = tb_->AddColumn("c0"); + cb->SetValues(values1); + cb->SetTime(TimeUnit::SECOND); + cb->Finish(); + Finish(); + + auto col = table_->column(0); + + ASSERT_EQ(fbs::TypeMetadata_TimeMetadata, col->metadata_type()); + ArrayMetadata result; + FromFlatbuffer(col->values(), &result); + AssertArrayEquals(result, values1); + + auto t_ptr = static_cast<const fbs::TimeMetadata*>(col->metadata()); + ASSERT_EQ(fbs::TimeUnit_SECOND, t_ptr->unit()); +} + +void CheckArrays(const Array& expected, const Array& result) { + if (!result.Equals(expected)) { + std::stringstream pp_result; + std::stringstream pp_expected; + + EXPECT_OK(PrettyPrint(result, 0, &pp_result)); + EXPECT_OK(PrettyPrint(expected, 0, &pp_expected)); + FAIL() << "Got: " << pp_result.str() << "\nExpected: " << pp_expected.str(); + } +} + +class TestTableWriter : public ::testing::Test { + public: + void SetUp() { + ASSERT_OK(io::BufferOutputStream::Create(1024, default_memory_pool(), &stream_)); + ASSERT_OK(TableWriter::Open(stream_, &writer_)); + } + + void Finish() { + // Write table footer + ASSERT_OK(writer_->Finalize()); + + ASSERT_OK(stream_->Finish(&output_)); + + std::shared_ptr<io::BufferReader> buffer(new io::BufferReader(output_)); + reader_.reset(new TableReader()); + ASSERT_OK(reader_->Open(buffer)); + } + + void CheckBatch(const RecordBatch& batch) { + for (int i = 0; i < batch.num_columns(); ++i) { + ASSERT_OK(writer_->Append(batch.column_name(i), *batch.column(i))); + } + Finish(); + + std::shared_ptr<Column> col; + for (int i = 0; i < batch.num_columns(); ++i) { + ASSERT_OK(reader_->GetColumn(i, &col)); + ASSERT_EQ(batch.column_name(i), col->name()); + CheckArrays(*batch.column(i), *col->data()->chunk(0)); + } + } + + protected: + std::shared_ptr<io::BufferOutputStream> stream_; + std::unique_ptr<TableWriter> writer_; + std::unique_ptr<TableReader> reader_; + + std::shared_ptr<Buffer> output_; +}; + +TEST_F(TestTableWriter, EmptyTable) { + Finish(); + + ASSERT_FALSE(reader_->HasDescription()); + ASSERT_EQ("", reader_->GetDescription()); + + ASSERT_EQ(0, reader_->num_rows()); + ASSERT_EQ(0, reader_->num_columns()); +} + +TEST_F(TestTableWriter, SetNumRows) { + writer_->SetNumRows(1000); + Finish(); + ASSERT_EQ(1000, reader_->num_rows()); +} + +TEST_F(TestTableWriter, SetDescription) { + std::string desc("contents of the file"); + writer_->SetDescription(desc); + Finish(); + + ASSERT_TRUE(reader_->HasDescription()); + ASSERT_EQ(desc, reader_->GetDescription()); + + ASSERT_EQ(0, reader_->num_rows()); + ASSERT_EQ(0, reader_->num_columns()); +} + +TEST_F(TestTableWriter, PrimitiveRoundTrip) { + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(MakeIntRecordBatch(&batch)); + + ASSERT_OK(writer_->Append("f0", *batch->column(0))); + ASSERT_OK(writer_->Append("f1", *batch->column(1))); + Finish(); + + std::shared_ptr<Column> col; + ASSERT_OK(reader_->GetColumn(0, &col)); + ASSERT_TRUE(col->data()->chunk(0)->Equals(batch->column(0))); + ASSERT_EQ("f0", col->name()); + + ASSERT_OK(reader_->GetColumn(1, &col)); + ASSERT_TRUE(col->data()->chunk(0)->Equals(batch->column(1))); + ASSERT_EQ("f1", col->name()); +} + +Status MakeDictionaryFlat(std::shared_ptr<RecordBatch>* out) { + const int64_t length = 6; + + std::vector<bool> is_valid = {true, true, false, true, true, true}; + std::shared_ptr<Array> dict1, dict2; + + std::vector<std::string> dict1_values = {"foo", "bar", "baz"}; + std::vector<std::string> dict2_values = {"foo", "bar", "baz", "qux"}; + + ArrayFromVector<StringType, std::string>(dict1_values, &dict1); + ArrayFromVector<StringType, std::string>(dict2_values, &dict2); + + auto f0_type = arrow::dictionary(arrow::int32(), dict1); + auto f1_type = arrow::dictionary(arrow::int8(), dict1); + auto f2_type = arrow::dictionary(arrow::int32(), dict2); + + std::shared_ptr<Array> indices0, indices1, indices2; + std::vector<int32_t> indices0_values = {1, 2, -1, 0, 2, 0}; + std::vector<int8_t> indices1_values = {0, 0, 2, 2, 1, 1}; + std::vector<int32_t> indices2_values = {3, 0, 2, 1, 0, 2}; + + ArrayFromVector<Int32Type, int32_t>(is_valid, indices0_values, &indices0); + ArrayFromVector<Int8Type, int8_t>(is_valid, indices1_values, &indices1); + ArrayFromVector<Int32Type, int32_t>(is_valid, indices2_values, &indices2); + + auto a0 = std::make_shared<DictionaryArray>(f0_type, indices0); + auto a1 = std::make_shared<DictionaryArray>(f1_type, indices1); + auto a2 = std::make_shared<DictionaryArray>(f2_type, indices2); + + // construct batch + std::shared_ptr<Schema> schema(new Schema( + {field("dict1", f0_type), field("sparse", f1_type), field("dense", f2_type)})); + + std::vector<std::shared_ptr<Array>> arrays = {a0, a1, a2}; + out->reset(new RecordBatch(schema, length, arrays)); + return Status::OK(); +} + +TEST_F(TestTableWriter, CategoryRoundtrip) { + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(MakeDictionaryFlat(&batch)); + CheckBatch(*batch); +} + +TEST_F(TestTableWriter, TimeTypes) { + std::vector<bool> is_valid = {true, true, true, false, true, true, true}; + auto f0 = field("f0", date32()); + auto f1 = field("f1", time(TimeUnit::MILLI)); + auto f2 = field("f2", timestamp(TimeUnit::NANO)); + auto f3 = field("f3", timestamp("US/Los_Angeles", TimeUnit::SECOND)); + std::shared_ptr<Schema> schema(new Schema({f0, f1, f2, f3})); + + std::vector<int64_t> values_vec = {0, 1, 2, 3, 4, 5, 6}; + std::shared_ptr<Array> values; + ArrayFromVector<Int64Type, int64_t>(is_valid, values_vec, &values); + + std::vector<int32_t> date_values_vec = {0, 1, 2, 3, 4, 5, 6}; + std::shared_ptr<Array> date_array; + ArrayFromVector<Date32Type, int32_t>(is_valid, date_values_vec, &date_array); + + std::vector<FieldMetadata> fields(1); + fields[0].length = values->length(); + fields[0].null_count = values->null_count(); + fields[0].offset = 0; + + const auto& prim_values = static_cast<const PrimitiveArray&>(*values); + std::vector<std::shared_ptr<Buffer>> buffers = { + prim_values.null_bitmap(), prim_values.data()}; + + std::vector<std::shared_ptr<Array>> arrays; + arrays.push_back(date_array); + + for (int i = 1; i < schema->num_fields(); ++i) { + std::shared_ptr<Array> arr; + LoadArray(schema->field(i)->type, fields, buffers, &arr); + arrays.push_back(arr); + } + + RecordBatch batch(schema, values->length(), arrays); + CheckBatch(batch); +} + +TEST_F(TestTableWriter, VLenPrimitiveRoundTrip) { + std::shared_ptr<RecordBatch> batch; + ASSERT_OK(MakeStringTypesRecordBatch(&batch)); + CheckBatch(*batch); +} + +} // namespace feather +} // namespace ipc +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/cpp/src/arrow/ipc/feather.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc new file mode 100644 index 0000000..13dfa58 --- /dev/null +++ b/cpp/src/arrow/ipc/feather.cc @@ -0,0 +1,729 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/ipc/feather.h" + +#include <algorithm> +#include <cstdint> +#include <cstring> +#include <iostream> +#include <memory> +#include <sstream> +#include <string> +#include <vector> + +#include "flatbuffers/flatbuffers.h" + +#include "arrow/array.h" +#include "arrow/buffer.h" +#include "arrow/column.h" +#include "arrow/io/file.h" +#include "arrow/ipc/feather-internal.h" +#include "arrow/ipc/feather_generated.h" +#include "arrow/loader.h" +#include "arrow/status.h" +#include "arrow/util/bit-util.h" + +namespace arrow { +namespace ipc { +namespace feather { + +static const uint8_t kPaddingBytes[kFeatherDefaultAlignment] = {0}; + +static inline int64_t PaddedLength(int64_t nbytes) { + static const int64_t alignment = kFeatherDefaultAlignment; + return ((nbytes + alignment - 1) / alignment) * alignment; +} + +// XXX: Hack for Feather 0.3.0 for backwards compatibility with old files +// Size in-file of written byte buffer +static int64_t GetOutputLength(int64_t nbytes) { + if (kFeatherVersion < 2) { + // Feather files < 0.3.0 + return nbytes; + } else { + return PaddedLength(nbytes); + } +} + +static Status WritePadded(io::OutputStream* stream, const uint8_t* data, int64_t length, + int64_t* bytes_written) { + RETURN_NOT_OK(stream->Write(data, length)); + + int64_t remainder = PaddedLength(length) - length; + if (remainder != 0) { RETURN_NOT_OK(stream->Write(kPaddingBytes, remainder)); } + *bytes_written = length + remainder; + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// TableBuilder + +TableBuilder::TableBuilder(int64_t num_rows) : finished_(false), num_rows_(num_rows) {} + +FBB& TableBuilder::fbb() { + return fbb_; +} + +Status TableBuilder::Finish() { + if (finished_) { return Status::Invalid("can only call this once"); } + + FBString desc = 0; + if (!description_.empty()) { desc = fbb_.CreateString(description_); } + + flatbuffers::Offset<flatbuffers::String> metadata = 0; + + auto root = fbs::CreateCTable( + fbb_, desc, num_rows_, fbb_.CreateVector(columns_), kFeatherVersion, metadata); + fbb_.Finish(root); + finished_ = true; + + return Status::OK(); +} + +std::shared_ptr<Buffer> TableBuilder::GetBuffer() const { + return std::make_shared<Buffer>( + fbb_.GetBufferPointer(), static_cast<int64_t>(fbb_.GetSize())); +} + +void TableBuilder::SetDescription(const std::string& description) { + description_ = description; +} + +void TableBuilder::SetNumRows(int64_t num_rows) { + num_rows_ = num_rows; +} + +void TableBuilder::add_column(const flatbuffers::Offset<fbs::Column>& col) { + columns_.push_back(col); +} + +ColumnBuilder::ColumnBuilder(TableBuilder* parent, const std::string& name) + : parent_(parent) { + fbb_ = &parent->fbb(); + name_ = name; + type_ = ColumnType::PRIMITIVE; +} + +flatbuffers::Offset<void> ColumnBuilder::CreateColumnMetadata() { + switch (type_) { + case ColumnType::PRIMITIVE: + // flatbuffer void + return 0; + case ColumnType::CATEGORY: { + auto cat_meta = fbs::CreateCategoryMetadata( + fbb(), GetPrimitiveArray(fbb(), meta_category_.levels), meta_category_.ordered); + return cat_meta.Union(); + } + case ColumnType::TIMESTAMP: { + // flatbuffer void + flatbuffers::Offset<flatbuffers::String> tz = 0; + if (!meta_timestamp_.timezone.empty()) { + tz = fbb().CreateString(meta_timestamp_.timezone); + } + + auto ts_meta = + fbs::CreateTimestampMetadata(fbb(), ToFlatbufferEnum(meta_timestamp_.unit), tz); + return ts_meta.Union(); + } + case ColumnType::DATE: { + auto date_meta = fbs::CreateDateMetadata(fbb()); + return date_meta.Union(); + } + case ColumnType::TIME: { + auto time_meta = fbs::CreateTimeMetadata(fbb(), ToFlatbufferEnum(meta_time_.unit)); + return time_meta.Union(); + } + default: + // null + return flatbuffers::Offset<void>(); + } +} + +Status ColumnBuilder::Finish() { + FBB& buf = fbb(); + + // values + auto values = GetPrimitiveArray(buf, values_); + flatbuffers::Offset<void> metadata = CreateColumnMetadata(); + + auto column = fbs::CreateColumn(buf, buf.CreateString(name_), values, + ToFlatbufferEnum(type_), // metadata_type + metadata, buf.CreateString(user_metadata_)); + + // bad coupling, but OK for now + parent_->add_column(column); + return Status::OK(); +} + +void ColumnBuilder::SetValues(const ArrayMetadata& values) { + values_ = values; +} + +void ColumnBuilder::SetUserMetadata(const std::string& data) { + user_metadata_ = data; +} + +void ColumnBuilder::SetCategory(const ArrayMetadata& levels, bool ordered) { + type_ = ColumnType::CATEGORY; + meta_category_.levels = levels; + meta_category_.ordered = ordered; +} + +void ColumnBuilder::SetTimestamp(TimeUnit unit) { + type_ = ColumnType::TIMESTAMP; + meta_timestamp_.unit = unit; +} + +void ColumnBuilder::SetTimestamp(TimeUnit unit, const std::string& timezone) { + SetTimestamp(unit); + meta_timestamp_.timezone = timezone; +} + +void ColumnBuilder::SetDate() { + type_ = ColumnType::DATE; +} + +void ColumnBuilder::SetTime(TimeUnit unit) { + type_ = ColumnType::TIME; + meta_time_.unit = unit; +} + +FBB& ColumnBuilder::fbb() { + return *fbb_; +} + +std::unique_ptr<ColumnBuilder> TableBuilder::AddColumn(const std::string& name) { + return std::unique_ptr<ColumnBuilder>(new ColumnBuilder(this, name)); +} + +// ---------------------------------------------------------------------- +// reader.cc + +class TableReader::TableReaderImpl { + public: + TableReaderImpl() {} + + Status Open(const std::shared_ptr<io::ReadableFileInterface>& source) { + source_ = source; + + int magic_size = static_cast<int>(strlen(kFeatherMagicBytes)); + int footer_size = magic_size + static_cast<int>(sizeof(uint32_t)); + + // Pathological issue where the file is smaller than + int64_t size = 0; + RETURN_NOT_OK(source->GetSize(&size)); + if (size < magic_size + footer_size) { + return Status::Invalid("File is too small to be a well-formed file"); + } + + std::shared_ptr<Buffer> buffer; + RETURN_NOT_OK(source->Read(magic_size, &buffer)); + + if (memcmp(buffer->data(), kFeatherMagicBytes, magic_size)) { + return Status::Invalid("Not a feather file"); + } + + // Now get the footer and verify + RETURN_NOT_OK(source->ReadAt(size - footer_size, footer_size, &buffer)); + + if (memcmp(buffer->data() + sizeof(uint32_t), kFeatherMagicBytes, magic_size)) { + return Status::Invalid("Feather file footer incomplete"); + } + + uint32_t metadata_length = *reinterpret_cast<const uint32_t*>(buffer->data()); + if (size < magic_size + footer_size + metadata_length) { + return Status::Invalid("File is smaller than indicated metadata size"); + } + RETURN_NOT_OK( + source->ReadAt(size - footer_size - metadata_length, metadata_length, &buffer)); + + metadata_.reset(new TableMetadata()); + return metadata_->Open(buffer); + } + + Status GetDataType(const fbs::PrimitiveArray* values, fbs::TypeMetadata metadata_type, + const void* metadata, std::shared_ptr<DataType>* out) { +#define PRIMITIVE_CASE(CAP_TYPE, FACTORY_FUNC) \ + case fbs::Type_##CAP_TYPE: \ + *out = FACTORY_FUNC(); \ + break; + + switch (metadata_type) { + case fbs::TypeMetadata_CategoryMetadata: { + auto meta = static_cast<const fbs::CategoryMetadata*>(metadata); + + std::shared_ptr<DataType> index_type; + RETURN_NOT_OK(GetDataType(values, fbs::TypeMetadata_NONE, nullptr, &index_type)); + + std::shared_ptr<Array> levels; + RETURN_NOT_OK( + LoadValues(meta->levels(), fbs::TypeMetadata_NONE, nullptr, &levels)); + + *out = std::make_shared<DictionaryType>(index_type, levels, meta->ordered()); + break; + } + case fbs::TypeMetadata_TimestampMetadata: { + auto meta = static_cast<const fbs::TimestampMetadata*>(metadata); + TimeUnit unit = FromFlatbufferEnum(meta->unit()); + std::string tz; + // flatbuffer non-null + if (meta->timezone() != 0) { + tz = meta->timezone()->str(); + } else { + tz = ""; + } + *out = std::make_shared<TimestampType>(tz, unit); + } break; + case fbs::TypeMetadata_DateMetadata: + *out = date32(); + break; + case fbs::TypeMetadata_TimeMetadata: { + auto meta = static_cast<const fbs::TimeMetadata*>(metadata); + *out = std::make_shared<TimeType>(FromFlatbufferEnum(meta->unit())); + } break; + default: + switch (values->type()) { + PRIMITIVE_CASE(BOOL, boolean); + PRIMITIVE_CASE(INT8, int8); + PRIMITIVE_CASE(INT16, int16); + PRIMITIVE_CASE(INT32, int32); + PRIMITIVE_CASE(INT64, int64); + PRIMITIVE_CASE(UINT8, uint8); + PRIMITIVE_CASE(UINT16, uint16); + PRIMITIVE_CASE(UINT32, uint32); + PRIMITIVE_CASE(UINT64, uint64); + PRIMITIVE_CASE(FLOAT, float32); + PRIMITIVE_CASE(DOUBLE, float64); + PRIMITIVE_CASE(UTF8, utf8); + PRIMITIVE_CASE(BINARY, binary); + default: + return Status::Invalid("Unrecognized type"); + } + break; + } + +#undef PRIMITIVE_CASE + + return Status::OK(); + } + + // Retrieve a primitive array from the data source + // + // @returns: a Buffer instance, the precise type will depend on the kind of + // input data source (which may or may not have memory-map like semantics) + Status LoadValues(const fbs::PrimitiveArray* meta, fbs::TypeMetadata metadata_type, + const void* metadata, std::shared_ptr<Array>* out) { + std::shared_ptr<DataType> type; + RETURN_NOT_OK(GetDataType(meta, metadata_type, metadata, &type)); + + std::vector<FieldMetadata> fields(1); + std::vector<std::shared_ptr<Buffer>> buffers; + + // Buffer data from the source (may or may not perform a copy depending on + // input source) + std::shared_ptr<Buffer> buffer; + RETURN_NOT_OK(source_->ReadAt(meta->offset(), meta->total_bytes(), &buffer)); + + int64_t offset = 0; + + // If there are nulls, the null bitmask is first + if (meta->null_count() > 0) { + int64_t null_bitmap_size = GetOutputLength(BitUtil::BytesForBits(meta->length())); + buffers.push_back(SliceBuffer(buffer, offset, null_bitmap_size)); + offset += null_bitmap_size; + } else { + buffers.push_back(nullptr); + } + + if (is_binary_like(type->type)) { + int64_t offsets_size = GetOutputLength((meta->length() + 1) * sizeof(int32_t)); + buffers.push_back(SliceBuffer(buffer, offset, offsets_size)); + offset += offsets_size; + } + + buffers.push_back(SliceBuffer(buffer, offset, buffer->size() - offset)); + + fields[0].length = meta->length(); + fields[0].null_count = meta->null_count(); + fields[0].offset = 0; + + return LoadArray(type, fields, buffers, out); + } + + bool HasDescription() const { return metadata_->HasDescription(); } + + std::string GetDescription() const { return metadata_->GetDescription(); } + + int version() const { return metadata_->version(); } + int64_t num_rows() const { return metadata_->num_rows(); } + int64_t num_columns() const { return metadata_->num_columns(); } + + std::string GetColumnName(int i) const { + const fbs::Column* col_meta = metadata_->column(i); + return col_meta->name()->str(); + } + + Status GetColumn(int i, std::shared_ptr<Column>* out) { + const fbs::Column* col_meta = metadata_->column(i); + + // auto user_meta = column->user_metadata(); + // if (user_meta->size() > 0) { user_metadata_ = user_meta->str(); } + + std::shared_ptr<Array> values; + RETURN_NOT_OK(LoadValues( + col_meta->values(), col_meta->metadata_type(), col_meta->metadata(), &values)); + out->reset(new Column(col_meta->name()->str(), values)); + return Status::OK(); + } + + private: + std::shared_ptr<io::ReadableFileInterface> source_; + std::unique_ptr<TableMetadata> metadata_; + + std::shared_ptr<Schema> schema_; +}; + +// ---------------------------------------------------------------------- +// TableReader public API + +TableReader::TableReader() { + impl_.reset(new TableReaderImpl()); +} + +TableReader::~TableReader() {} + +Status TableReader::Open(const std::shared_ptr<io::ReadableFileInterface>& source) { + return impl_->Open(source); +} + +Status TableReader::OpenFile( + const std::string& abspath, std::unique_ptr<TableReader>* out) { + std::shared_ptr<io::MemoryMappedFile> file; + RETURN_NOT_OK(io::MemoryMappedFile::Open(abspath, io::FileMode::READ, &file)); + out->reset(new TableReader()); + return (*out)->Open(file); +} + +bool TableReader::HasDescription() const { + return impl_->HasDescription(); +} + +std::string TableReader::GetDescription() const { + return impl_->GetDescription(); +} + +int TableReader::version() const { + return impl_->version(); +} + +int64_t TableReader::num_rows() const { + return impl_->num_rows(); +} + +int64_t TableReader::num_columns() const { + return impl_->num_columns(); +} + +std::string TableReader::GetColumnName(int i) const { + return impl_->GetColumnName(i); +} + +Status TableReader::GetColumn(int i, std::shared_ptr<Column>* out) { + return impl_->GetColumn(i, out); +} + +// ---------------------------------------------------------------------- +// writer.cc + +fbs::Type ToFlatbufferType(Type::type type) { + switch (type) { + case Type::BOOL: + return fbs::Type_BOOL; + case Type::INT8: + return fbs::Type_INT8; + case Type::INT16: + return fbs::Type_INT16; + case Type::INT32: + return fbs::Type_INT32; + case Type::INT64: + return fbs::Type_INT64; + case Type::UINT8: + return fbs::Type_UINT8; + case Type::UINT16: + return fbs::Type_UINT16; + case Type::UINT32: + return fbs::Type_UINT32; + case Type::UINT64: + return fbs::Type_UINT64; + case Type::FLOAT: + return fbs::Type_FLOAT; + case Type::DOUBLE: + return fbs::Type_DOUBLE; + case Type::STRING: + return fbs::Type_UTF8; + case Type::BINARY: + return fbs::Type_BINARY; + case Type::DATE32: + return fbs::Type_DATE; + case Type::TIMESTAMP: + return fbs::Type_TIMESTAMP; + case Type::TIME: + return fbs::Type_TIME; + case Type::DICTIONARY: + return fbs::Type_CATEGORY; + default: + break; + } + // prevent compiler warning + return fbs::Type_MIN; +} + +class TableWriter::TableWriterImpl : public ArrayVisitor { + public: + TableWriterImpl() : initialized_stream_(false), metadata_(0) {} + + Status Open(const std::shared_ptr<io::OutputStream>& stream) { + stream_ = stream; + return Status::OK(); + } + + void SetDescription(const std::string& desc) { metadata_.SetDescription(desc); } + + void SetNumRows(int64_t num_rows) { metadata_.SetNumRows(num_rows); } + + Status Finalize() { + RETURN_NOT_OK(CheckStarted()); + metadata_.Finish(); + + auto buffer = metadata_.GetBuffer(); + + // Writer metadata + int64_t bytes_written; + RETURN_NOT_OK( + WritePadded(stream_.get(), buffer->data(), buffer->size(), &bytes_written)); + uint32_t buffer_size = static_cast<uint32_t>(bytes_written); + + // Footer: metadata length, magic bytes + RETURN_NOT_OK( + stream_->Write(reinterpret_cast<const uint8_t*>(&buffer_size), sizeof(uint32_t))); + RETURN_NOT_OK(stream_->Write(reinterpret_cast<const uint8_t*>(kFeatherMagicBytes), + strlen(kFeatherMagicBytes))); + return stream_->Close(); + } + + Status LoadArrayMetadata(const Array& values, ArrayMetadata* meta) { + if (!(is_primitive(values.type_enum()) || is_binary_like(values.type_enum()))) { + std::stringstream ss; + ss << "Array is not primitive type: " << values.type()->ToString(); + return Status::Invalid(ss.str()); + } + + meta->type = ToFlatbufferType(values.type_enum()); + + RETURN_NOT_OK(stream_->Tell(&meta->offset)); + + meta->length = values.length(); + meta->null_count = values.null_count(); + meta->total_bytes = 0; + + return Status::OK(); + } + + Status WriteArray(const Array& values, ArrayMetadata* meta) { + RETURN_NOT_OK(CheckStarted()); + RETURN_NOT_OK(LoadArrayMetadata(values, meta)); + + int64_t bytes_written; + + // Write the null bitmask + if (values.null_count() > 0) { + // We assume there is one bit for each value in values.nulls, aligned on a + // byte boundary, and we write this much data into the stream + RETURN_NOT_OK(WritePadded(stream_.get(), values.null_bitmap()->data(), + values.null_bitmap()->size(), &bytes_written)); + meta->total_bytes += bytes_written; + } + + int64_t values_bytes = 0; + + const uint8_t* values_buffer = nullptr; + + if (is_binary_like(values.type_enum())) { + const auto& bin_values = static_cast<const BinaryArray&>(values); + + int64_t offset_bytes = sizeof(int32_t) * (values.length() + 1); + + values_bytes = bin_values.raw_value_offsets()[values.length()]; + + // Write the variable-length offsets + RETURN_NOT_OK(WritePadded(stream_.get(), + reinterpret_cast<const uint8_t*>(bin_values.raw_value_offsets()), offset_bytes, + &bytes_written)) + meta->total_bytes += bytes_written; + + if (bin_values.data()) { values_buffer = bin_values.data()->data(); } + } else { + const auto& prim_values = static_cast<const PrimitiveArray&>(values); + const auto& fw_type = static_cast<const FixedWidthType&>(*values.type()); + + if (values.type_enum() == Type::BOOL) { + // Booleans are bit-packed + values_bytes = BitUtil::BytesForBits(values.length()); + } else { + values_bytes = values.length() * fw_type.bit_width() / 8; + } + + if (prim_values.data()) { values_buffer = prim_values.data()->data(); } + } + RETURN_NOT_OK( + WritePadded(stream_.get(), values_buffer, values_bytes, &bytes_written)); + meta->total_bytes += bytes_written; + + return Status::OK(); + } + + Status WritePrimitiveValues(const Array& values) { + // Prepare metadata payload + ArrayMetadata meta; + RETURN_NOT_OK(WriteArray(values, &meta)); + current_column_->SetValues(meta); + return Status::OK(); + } + +#define VISIT_PRIMITIVE(TYPE) \ + Status Visit(const TYPE& values) override { return WritePrimitiveValues(values); } + + VISIT_PRIMITIVE(BooleanArray); + VISIT_PRIMITIVE(Int8Array); + VISIT_PRIMITIVE(Int16Array); + VISIT_PRIMITIVE(Int32Array); + VISIT_PRIMITIVE(Int64Array); + VISIT_PRIMITIVE(UInt8Array); + VISIT_PRIMITIVE(UInt16Array); + VISIT_PRIMITIVE(UInt32Array); + VISIT_PRIMITIVE(UInt64Array); + VISIT_PRIMITIVE(FloatArray); + VISIT_PRIMITIVE(DoubleArray); + VISIT_PRIMITIVE(BinaryArray); + VISIT_PRIMITIVE(StringArray); + +#undef VISIT_PRIMITIVE + + Status Visit(const DictionaryArray& values) override { + const auto& dict_type = static_cast<const DictionaryType&>(*values.type()); + + if (!is_integer(values.indices()->type_enum())) { + return Status::Invalid("Category values must be integers"); + } + + RETURN_NOT_OK(WritePrimitiveValues(*values.indices())); + + ArrayMetadata levels_meta; + RETURN_NOT_OK(WriteArray(*dict_type.dictionary(), &levels_meta)); + current_column_->SetCategory(levels_meta, dict_type.ordered()); + return Status::OK(); + } + + Status Visit(const TimestampArray& values) override { + RETURN_NOT_OK(WritePrimitiveValues(values)); + const auto& ts_type = static_cast<const TimestampType&>(*values.type()); + current_column_->SetTimestamp(ts_type.unit, ts_type.timezone); + return Status::OK(); + } + + Status Visit(const Date32Array& values) override { + RETURN_NOT_OK(WritePrimitiveValues(values)); + current_column_->SetDate(); + return Status::OK(); + } + + Status Visit(const TimeArray& values) override { + RETURN_NOT_OK(WritePrimitiveValues(values)); + auto unit = static_cast<const TimeType&>(*values.type()).unit; + current_column_->SetTime(unit); + return Status::OK(); + } + + Status Append(const std::string& name, const Array& values) { + current_column_ = metadata_.AddColumn(name); + RETURN_NOT_OK(values.Accept(this)); + current_column_->Finish(); + return Status::OK(); + } + + private: + Status CheckStarted() { + if (!initialized_stream_) { + int64_t bytes_written_unused; + RETURN_NOT_OK( + WritePadded(stream_.get(), reinterpret_cast<const uint8_t*>(kFeatherMagicBytes), + strlen(kFeatherMagicBytes), &bytes_written_unused)); + initialized_stream_ = true; + } + return Status::OK(); + } + + std::shared_ptr<io::OutputStream> stream_; + + bool initialized_stream_; + TableBuilder metadata_; + + std::unique_ptr<ColumnBuilder> current_column_; + + Status AppendPrimitive(const PrimitiveArray& values, ArrayMetadata* out); +}; + +TableWriter::TableWriter() { + impl_.reset(new TableWriterImpl()); +} + +TableWriter::~TableWriter() {} + +Status TableWriter::Open( + const std::shared_ptr<io::OutputStream>& stream, std::unique_ptr<TableWriter>* out) { + out->reset(new TableWriter()); + return (*out)->impl_->Open(stream); +} + +Status TableWriter::OpenFile( + const std::string& abspath, std::unique_ptr<TableWriter>* out) { + std::shared_ptr<io::FileOutputStream> file; + RETURN_NOT_OK(io::FileOutputStream::Open(abspath, &file)); + out->reset(new TableWriter()); + return (*out)->impl_->Open(file); +} + +void TableWriter::SetDescription(const std::string& desc) { + impl_->SetDescription(desc); +} + +void TableWriter::SetNumRows(int64_t num_rows) { + impl_->SetNumRows(num_rows); +} + +Status TableWriter::Append(const std::string& name, const Array& values) { + return impl_->Append(name, values); +} + +Status TableWriter::Finalize() { + return impl_->Finalize(); +} + +} // namespace feather +} // namespace ipc +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/cpp/src/arrow/ipc/feather.fbs ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/feather.fbs b/cpp/src/arrow/ipc/feather.fbs new file mode 100644 index 0000000..a27d399 --- /dev/null +++ b/cpp/src/arrow/ipc/feather.fbs @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +namespace arrow.ipc.feather.fbs; + +/// Feather is an experimental serialization format implemented using +/// techniques from Apache Arrow. It was created as a proof-of-concept of an +/// interoperable file format for storing data frames originating in Python or +/// R. It enabled the developers to sidestep some of the open design questions +/// in Arrow from early 2016 and instead create something simple and useful for +/// the intended use cases. + +enum Type : byte { + BOOL = 0, + + INT8 = 1, + INT16 = 2, + INT32 = 3, + INT64 = 4, + + UINT8 = 5, + UINT16 = 6, + UINT32 = 7, + UINT64 = 8, + + FLOAT = 9, + DOUBLE = 10, + + UTF8 = 11, + + BINARY = 12, + + CATEGORY = 13, + + TIMESTAMP = 14, + DATE = 15, + TIME = 16 +} + +enum Encoding : byte { + PLAIN = 0, + + /// Data is stored dictionary-encoded + /// dictionary size: <INT32 Dictionary size> + /// dictionary data: <TYPE primitive array> + /// dictionary index: <INT32 primitive array> + /// + /// TODO: do we care about storing the index values in a smaller typeclass + DICTIONARY = 1 +} + +enum TimeUnit : byte { + SECOND = 0, + MILLISECOND = 1, + MICROSECOND = 2, + NANOSECOND = 3 +} + +table PrimitiveArray { + type: Type; + + encoding: Encoding = PLAIN; + + /// Relative memory offset of the start of the array data excluding the size + /// of the metadata + offset: long; + + /// The number of logical values in the array + length: long; + + /// The number of observed nulls + null_count: long; + + /// The total size of the actual data in the file + total_bytes: long; + + /// TODO: Compression +} + +table CategoryMetadata { + /// The category codes are presumed to be integers that are valid indexes into + /// the levels array + + levels: PrimitiveArray; + ordered: bool = false; +} + +table TimestampMetadata { + unit: TimeUnit; + + /// Timestamp data is assumed to be UTC, but the time zone is stored here for + /// presentation as localized + timezone: string; +} + +table DateMetadata { +} + +table TimeMetadata { + unit: TimeUnit; +} + +union TypeMetadata { + CategoryMetadata, + TimestampMetadata, + DateMetadata, + TimeMetadata, +} + +table Column { + name: string; + values: PrimitiveArray; + metadata: TypeMetadata; + + /// This should (probably) be JSON + user_metadata: string; +} + +table CTable { + /// Some text (or a name) metadata about what the file is, optional + description: string; + + num_rows: long; + columns: [Column]; + + /// Version number of the Feather format + version: int; + + /// Table metadata (likely JSON), not yet used + metadata: string; +} + +root_type CTable; http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/cpp/src/arrow/ipc/feather.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/feather.h b/cpp/src/arrow/ipc/feather.h new file mode 100644 index 0000000..3d370df --- /dev/null +++ b/cpp/src/arrow/ipc/feather.h @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Public API for the "Feather" file format, originally created at +/// http://github.com/wesm/feather + +#ifndef ARROW_IPC_FEATHER_H +#define ARROW_IPC_FEATHER_H + +#include <cstdint> +#include <memory> +#include <string> +#include <vector> + +#include "arrow/type.h" + +namespace arrow { + +class Buffer; +class Column; +class Status; + +namespace io { + +class OutputStream; +class ReadableFileInterface; + +} // namespace io + +namespace ipc { +namespace feather { + +static constexpr const int kFeatherVersion = 2; + +// ---------------------------------------------------------------------- +// Metadata accessor classes + +class ARROW_EXPORT TableReader { + public: + TableReader(); + ~TableReader(); + + Status Open(const std::shared_ptr<io::ReadableFileInterface>& source); + + static Status OpenFile(const std::string& abspath, std::unique_ptr<TableReader>* out); + + // Optional table description + // + // This does not return a const std::string& because a string has to be + // copied from the flatbuffer to be able to return a non-flatbuffer type + std::string GetDescription() const; + bool HasDescription() const; + + int version() const; + + int64_t num_rows() const; + int64_t num_columns() const; + + std::string GetColumnName(int i) const; + + Status GetColumn(int i, std::shared_ptr<Column>* out); + + private: + class ARROW_NO_EXPORT TableReaderImpl; + std::unique_ptr<TableReaderImpl> impl_; +}; + +class ARROW_EXPORT TableWriter { + public: + ~TableWriter(); + + static Status Open( + const std::shared_ptr<io::OutputStream>& stream, std::unique_ptr<TableWriter>* out); + + static Status OpenFile(const std::string& abspath, std::unique_ptr<TableWriter>* out); + + void SetDescription(const std::string& desc); + void SetNumRows(int64_t num_rows); + + Status Append(const std::string& name, const Array& values); + + // We are done, write the file metadata and footer + Status Finalize(); + + private: + TableWriter(); + class ARROW_NO_EXPORT TableWriterImpl; + std::unique_ptr<TableWriterImpl> impl_; +}; + +} // namespace feather +} // namespace ipc +} // namespace arrow + +#endif // ARROW_IPC_FEATHER_H http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/python/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index 6e6d609..ef874e3 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -437,6 +437,7 @@ set(CYTHON_EXTENSIONS config error io + _feather memory scalar schema http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/python/pyarrow/_feather.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/_feather.pyx b/python/pyarrow/_feather.pyx new file mode 100644 index 0000000..67f734f --- /dev/null +++ b/python/pyarrow/_feather.pyx @@ -0,0 +1,158 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# cython: profile=False +# distutils: language = c++ +# cython: embedsignature = True + +from cython.operator cimport dereference as deref + +from pyarrow.includes.common cimport * +from pyarrow.includes.libarrow cimport CArray, CColumn, CSchema, CStatus +from pyarrow.includes.libarrow_io cimport ReadableFileInterface, OutputStream + +from libcpp.string cimport string +from libcpp cimport bool as c_bool + +cimport cpython + +from pyarrow.compat import frombytes, tobytes, encode_file_path + +from pyarrow.array cimport Array +from pyarrow.error cimport check_status +from pyarrow.table cimport Column + +cdef extern from "arrow/ipc/feather.h" namespace "arrow::ipc::feather" nogil: + + cdef cppclass TableWriter: + @staticmethod + CStatus Open(const shared_ptr[OutputStream]& stream, + unique_ptr[TableWriter]* out) + + @staticmethod + CStatus OpenFile(const string& abspath, unique_ptr[TableWriter]* out) + + void SetDescription(const string& desc) + void SetNumRows(int64_t num_rows) + + CStatus Append(const string& name, const CArray& values) + CStatus Finalize() + + cdef cppclass TableReader: + TableReader(const shared_ptr[ReadableFileInterface]& source) + + @staticmethod + CStatus OpenFile(const string& abspath, unique_ptr[TableReader]* out) + + string GetDescription() + c_bool HasDescription() + + int64_t num_rows() + int64_t num_columns() + + shared_ptr[CSchema] schema() + + CStatus GetColumn(int i, shared_ptr[CColumn]* out) + c_string GetColumnName(int i) + + +class FeatherError(Exception): + pass + + +cdef class FeatherWriter: + cdef: + unique_ptr[TableWriter] writer + + cdef public: + int64_t num_rows + + def __cinit__(self): + self.num_rows = -1 + + def open(self, object dest): + cdef: + string c_name = encode_file_path(dest) + + check_status(TableWriter.OpenFile(c_name, &self.writer)) + + def close(self): + if self.num_rows < 0: + self.num_rows = 0 + self.writer.get().SetNumRows(self.num_rows) + check_status(self.writer.get().Finalize()) + + def write_array(self, object name, object col, object mask=None): + cdef Array arr + + if self.num_rows >= 0: + if len(col) != self.num_rows: + raise ValueError('prior column had a different number of rows') + else: + self.num_rows = len(col) + + if isinstance(col, Array): + arr = col + else: + arr = Array.from_pandas(col, mask=mask) + + cdef c_string c_name = tobytes(name) + + with nogil: + check_status( + self.writer.get().Append(c_name, deref(arr.sp_array))) + + +cdef class FeatherReader: + cdef: + unique_ptr[TableReader] reader + + def __cinit__(self): + pass + + def open(self, source): + cdef: + string c_name = encode_file_path(source) + + check_status(TableReader.OpenFile(c_name, &self.reader)) + + property num_rows: + + def __get__(self): + return self.reader.get().num_rows() + + property num_columns: + + def __get__(self): + return self.reader.get().num_columns() + + def get_column_name(self, int i): + cdef c_string name = self.reader.get().GetColumnName(i) + return frombytes(name) + + def get_column(self, int i): + if i < 0 or i >= self.num_columns: + raise IndexError(i) + + cdef shared_ptr[CColumn] sp_column + with nogil: + check_status(self.reader.get() + .GetColumn(i, &sp_column)) + + cdef Column col = Column() + col.init(sp_column) + return col http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/python/pyarrow/feather.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/feather.py b/python/pyarrow/feather.py new file mode 100644 index 0000000..b7dbf96 --- /dev/null +++ b/python/pyarrow/feather.py @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import six +from distutils.version import LooseVersion +import pandas as pd + +from pyarrow._feather import FeatherError # noqa +from pyarrow.table import Table +import pyarrow._feather as ext + + +if LooseVersion(pd.__version__) < '0.17.0': + raise ImportError("feather requires pandas >= 0.17.0") + +if LooseVersion(pd.__version__) < '0.19.0': + pdapi = pd.core.common +else: + pdapi = pd.api.types + + +class FeatherReader(ext.FeatherReader): + + def __init__(self, source): + self.source = source + self.open(source) + + def read(self, columns=None): + if columns is not None: + column_set = set(columns) + else: + column_set = None + + columns = [] + names = [] + for i in range(self.num_columns): + name = self.get_column_name(i) + if column_set is None or name in column_set: + col = self.get_column(i) + columns.append(col) + names.append(name) + + table = Table.from_arrays(columns, names=names) + return table.to_pandas() + + +def write_feather(df, path): + ''' + Write a pandas.DataFrame to Feather format + ''' + writer = ext.FeatherWriter() + writer.open(path) + + if isinstance(df, pd.SparseDataFrame): + df = df.to_dense() + + if not df.columns.is_unique: + raise ValueError("cannot serialize duplicate column names") + + # TODO(wesm): pipeline conversion to Arrow memory layout + for i, name in enumerate(df.columns): + col = df.iloc[:, i] + + if pdapi.is_object_dtype(col): + inferred_type = pd.lib.infer_dtype(col) + msg = ("cannot serialize column {n} " + "named {name} with dtype {dtype}".format( + n=i, name=name, dtype=inferred_type)) + + if inferred_type in ['mixed']: + + # allow columns with nulls + an inferable type + inferred_type = pd.lib.infer_dtype(col[col.notnull()]) + if inferred_type in ['mixed']: + raise ValueError(msg) + + elif inferred_type not in ['unicode', 'string']: + raise ValueError(msg) + + if not isinstance(name, six.string_types): + name = str(name) + + writer.write_array(name, col) + + writer.close() + + +def read_feather(path, columns=None): + """ + Read a pandas.DataFrame from Feather format + + Parameters + ---------- + path : string, path to read from + columns : sequence, optional + Only read a specific set of columns. If not provided, all columns are + read + + Returns + ------- + df : pandas.DataFrame + """ + reader = FeatherReader(path) + return reader.read(columns=columns) http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/python/pyarrow/table.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/table.pyx b/python/pyarrow/table.pyx index ad5af1b..5657b97 100644 --- a/python/pyarrow/table.pyx +++ b/python/pyarrow/table.pyx @@ -540,6 +540,11 @@ cdef table_to_blockmanager(const shared_ptr[CTable]& table, int nthreads): block = _int.make_block(cat, placement=placement, klass=_int.CategoricalBlock, fastpath=True) + elif 'timezone' in item: + from pandas.types.api import DatetimeTZDtype + dtype = DatetimeTZDtype('ns', tz=item['timezone']) + block = _int.make_block(block_arr, placement=placement, + dtype=dtype, fastpath=True) else: block = _int.make_block(block_arr, placement=placement) blocks.append(block) http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/python/pyarrow/tests/test_feather.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_feather.py b/python/pyarrow/tests/test_feather.py new file mode 100644 index 0000000..451475b --- /dev/null +++ b/python/pyarrow/tests/test_feather.py @@ -0,0 +1,379 @@ +# Copyright 2016 Feather Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +import pytest + +from numpy.testing import assert_array_equal +import numpy as np + +from pandas.util.testing import assert_frame_equal +import pandas as pd + +from pyarrow.compat import guid +from pyarrow.error import ArrowException +from pyarrow.feather import (read_feather, write_feather, + FeatherReader) +from pyarrow._feather import FeatherWriter + + +def random_path(): + return 'feather_{}'.format(guid()) + + +class TestFeatherReader(unittest.TestCase): + + def setUp(self): + self.test_files = [] + + def tearDown(self): + for path in self.test_files: + try: + os.remove(path) + except os.error: + pass + + def test_file_not_exist(self): + with self.assertRaises(ArrowException): + FeatherReader('test_invalid_file') + + def _get_null_counts(self, path, columns=None): + reader = FeatherReader(path) + counts = [] + for i in range(reader.num_columns): + col = reader.get_column(i) + if columns is None or col.name in columns: + counts.append(col.null_count) + + return counts + + def _check_pandas_roundtrip(self, df, expected=None, path=None, + columns=None, null_counts=None): + if path is None: + path = random_path() + + self.test_files.append(path) + write_feather(df, path) + if not os.path.exists(path): + raise Exception('file not written') + + result = read_feather(path, columns) + if expected is None: + expected = df + + assert_frame_equal(result, expected) + + if null_counts is None: + null_counts = np.zeros(len(expected.columns)) + + np.testing.assert_array_equal(self._get_null_counts(path, columns), + null_counts) + + def _assert_error_on_write(self, df, exc, path=None): + # check that we are raising the exception + # on writing + + if path is None: + path = random_path() + + self.test_files.append(path) + + def f(): + write_feather(df, path) + + self.assertRaises(exc, f) + + def test_num_rows_attr(self): + df = pd.DataFrame({'foo': [1, 2, 3, 4, 5]}) + path = random_path() + self.test_files.append(path) + write_feather(df, path) + + reader = FeatherReader(path) + assert reader.num_rows == len(df) + + df = pd.DataFrame({}) + path = random_path() + self.test_files.append(path) + write_feather(df, path) + + reader = FeatherReader(path) + assert reader.num_rows == 0 + + def test_float_no_nulls(self): + data = {} + numpy_dtypes = ['f4', 'f8'] + num_values = 100 + + for dtype in numpy_dtypes: + values = np.random.randn(num_values) + data[dtype] = values.astype(dtype) + + df = pd.DataFrame(data) + self._check_pandas_roundtrip(df) + + def test_float_nulls(self): + num_values = 100 + + path = random_path() + self.test_files.append(path) + writer = FeatherWriter() + writer.open(path) + + null_mask = np.random.randint(0, 10, size=num_values) < 3 + dtypes = ['f4', 'f8'] + expected_cols = [] + null_counts = [] + for name in dtypes: + values = np.random.randn(num_values).astype(name) + writer.write_array(name, values, null_mask) + + values[null_mask] = np.nan + + expected_cols.append(values) + null_counts.append(null_mask.sum()) + + writer.close() + + ex_frame = pd.DataFrame(dict(zip(dtypes, expected_cols)), + columns=dtypes) + + result = read_feather(path) + assert_frame_equal(result, ex_frame) + assert_array_equal(self._get_null_counts(path), null_counts) + + def test_integer_no_nulls(self): + data = {} + + numpy_dtypes = ['i1', 'i2', 'i4', 'i8', + 'u1', 'u2', 'u4', 'u8'] + num_values = 100 + + for dtype in numpy_dtypes: + values = np.random.randint(0, 100, size=num_values) + data[dtype] = values.astype(dtype) + + df = pd.DataFrame(data) + self._check_pandas_roundtrip(df) + + def test_platform_numpy_integers(self): + data = {} + + numpy_dtypes = ['longlong'] + num_values = 100 + + for dtype in numpy_dtypes: + values = np.random.randint(0, 100, size=num_values) + data[dtype] = values.astype(dtype) + + df = pd.DataFrame(data) + self._check_pandas_roundtrip(df) + + def test_integer_with_nulls(self): + # pandas requires upcast to float dtype + path = random_path() + self.test_files.append(path) + + int_dtypes = ['i1', 'i2', 'i4', 'i8', 'u1', 'u2', 'u4', 'u8'] + num_values = 100 + + writer = FeatherWriter() + writer.open(path) + + null_mask = np.random.randint(0, 10, size=num_values) < 3 + expected_cols = [] + for name in int_dtypes: + values = np.random.randint(0, 100, size=num_values) + writer.write_array(name, values, null_mask) + + expected = values.astype('f8') + expected[null_mask] = np.nan + + expected_cols.append(expected) + + ex_frame = pd.DataFrame(dict(zip(int_dtypes, expected_cols)), + columns=int_dtypes) + + writer.close() + + result = read_feather(path) + assert_frame_equal(result, ex_frame) + + def test_boolean_no_nulls(self): + num_values = 100 + + np.random.seed(0) + + df = pd.DataFrame({'bools': np.random.randn(num_values) > 0}) + self._check_pandas_roundtrip(df) + + def test_boolean_nulls(self): + # pandas requires upcast to object dtype + path = random_path() + self.test_files.append(path) + + num_values = 100 + np.random.seed(0) + + writer = FeatherWriter() + writer.open(path) + + mask = np.random.randint(0, 10, size=num_values) < 3 + values = np.random.randint(0, 10, size=num_values) < 5 + writer.write_array('bools', values, mask) + + expected = values.astype(object) + expected[mask] = None + + writer.close() + + ex_frame = pd.DataFrame({'bools': expected}) + + result = read_feather(path) + assert_frame_equal(result, ex_frame) + + def test_boolean_object_nulls(self): + repeats = 100 + arr = np.array([False, None, True] * repeats, dtype=object) + df = pd.DataFrame({'bools': arr}) + self._check_pandas_roundtrip(df, null_counts=[1 * repeats]) + + def test_strings(self): + repeats = 1000 + + # we hvae mixed bytes, unicode, strings + values = [b'foo', None, u'bar', 'qux', np.nan] + df = pd.DataFrame({'strings': values * repeats}) + self._assert_error_on_write(df, ValueError) + + # embedded nulls are ok + values = ['foo', None, 'bar', 'qux', None] + df = pd.DataFrame({'strings': values * repeats}) + expected = pd.DataFrame({'strings': values * repeats}) + self._check_pandas_roundtrip(df, expected, null_counts=[2 * repeats]) + + values = ['foo', None, 'bar', 'qux', np.nan] + df = pd.DataFrame({'strings': values * repeats}) + expected = pd.DataFrame({'strings': values * repeats}) + self._check_pandas_roundtrip(df, expected, null_counts=[2 * repeats]) + + def test_empty_strings(self): + df = pd.DataFrame({'strings': [''] * 10}) + self._check_pandas_roundtrip(df) + + def test_nan_as_null(self): + # Create a nan that is not numpy.nan + values = np.array(['foo', np.nan, np.nan * 2, 'bar'] * 10) + df = pd.DataFrame({'strings': values}) + self._check_pandas_roundtrip(df) + + def test_category(self): + repeats = 1000 + values = ['foo', None, u'bar', 'qux', np.nan] + df = pd.DataFrame({'strings': values * repeats}) + df['strings'] = df['strings'].astype('category') + + values = ['foo', None, 'bar', 'qux', None] + expected = pd.DataFrame({'strings': pd.Categorical(values * repeats)}) + self._check_pandas_roundtrip(df, expected, + null_counts=[2 * repeats]) + + @pytest.mark.xfail + def test_timestamp(self): + df = pd.DataFrame({'naive': pd.date_range('2016-03-28', periods=10)}) + df['with_tz'] = (df.naive.dt.tz_localize('utc') + .dt.tz_convert('America/Los_Angeles')) + + self._check_pandas_roundtrip(df) + + @pytest.mark.xfail + def test_timestamp_with_nulls(self): + df = pd.DataFrame({'test': [pd.datetime(2016, 1, 1), + None, + pd.datetime(2016, 1, 3)]}) + df['with_tz'] = df.test.dt.tz_localize('utc') + + self._check_pandas_roundtrip(df, null_counts=[1, 1]) + + @pytest.mark.xfail + def test_out_of_float64_timestamp_with_nulls(self): + df = pd.DataFrame( + {'test': pd.DatetimeIndex([1451606400000000001, + None, 14516064000030405])}) + df['with_tz'] = df.test.dt.tz_localize('utc') + self._check_pandas_roundtrip(df, null_counts=[1, 1]) + + def test_non_string_columns(self): + df = pd.DataFrame({0: [1, 2, 3, 4], + 1: [True, False, True, False]}) + + expected = df.rename(columns=str) + self._check_pandas_roundtrip(df, expected) + + def test_unicode_filename(self): + # GH #209 + name = (b'Besa_Kavaj\xc3\xab.feather').decode('utf-8') + df = pd.DataFrame({'foo': [1, 2, 3, 4]}) + self._check_pandas_roundtrip(df, path=name) + + def test_read_columns(self): + data = {'foo': [1, 2, 3, 4], + 'boo': [5, 6, 7, 8], + 'woo': [1, 3, 5, 7]} + columns = list(data.keys())[1:3] + df = pd.DataFrame(data) + expected = pd.DataFrame({c: data[c] for c in columns}) + self._check_pandas_roundtrip(df, expected, columns=columns) + + def test_overwritten_file(self): + path = random_path() + + num_values = 100 + np.random.seed(0) + + values = np.random.randint(0, 10, size=num_values) + write_feather(pd.DataFrame({'ints': values}), path) + + df = pd.DataFrame({'ints': values[0: num_values//2]}) + self._check_pandas_roundtrip(df, path=path) + + def test_sparse_dataframe(self): + # GH #221 + data = {'A': [0, 1, 2], + 'B': [1, 0, 1]} + df = pd.DataFrame(data).to_sparse(fill_value=1) + expected = df.to_dense() + self._check_pandas_roundtrip(df, expected) + + def test_duplicate_columns(self): + + # https://github.com/wesm/feather/issues/53 + # not currently able to handle duplicate columns + df = pd.DataFrame(np.arange(12).reshape(4, 3), + columns=list('aaa')).copy() + self._assert_error_on_write(df, ValueError) + + def test_unsupported(self): + # https://github.com/wesm/feather/issues/240 + # serializing actual python objects + + # period + df = pd.DataFrame({'a': pd.period_range('2013', freq='M', periods=3)}) + self._assert_error_on_write(df, ValueError) + + # non-strings + df = pd.DataFrame({'a': ['a', 1, 2.0]}) + self._assert_error_on_write(df, ValueError) http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index b0f29be..a0573fe 100644 --- a/python/setup.py +++ b/python/setup.py @@ -101,6 +101,7 @@ class build_ext(_build_ext): 'io', 'jemalloc', 'memory', + '_feather', '_parquet', 'scalar', 'schema', http://git-wip-us.apache.org/repos/asf/arrow/blob/d99958dd/python/src/pyarrow/adapters/pandas.cc ---------------------------------------------------------------------- diff --git a/python/src/pyarrow/adapters/pandas.cc b/python/src/pyarrow/adapters/pandas.cc index c707ada..eb3ab49 100644 --- a/python/src/pyarrow/adapters/pandas.cc +++ b/python/src/pyarrow/adapters/pandas.cc @@ -925,6 +925,32 @@ class DatetimeBlock : public PandasBlock { } }; +class DatetimeTZBlock : public DatetimeBlock { + public: + DatetimeTZBlock(const std::string& timezone, int64_t num_rows) + : DatetimeBlock(num_rows, 1), timezone_(timezone) {} + + Status GetPyResult(PyObject** output) override { + PyObject* result = PyDict_New(); + RETURN_IF_PYERROR(); + + PyObject* py_tz = PyUnicode_FromStringAndSize( + timezone_.c_str(), static_cast<Py_ssize_t>(timezone_.size())); + RETURN_IF_PYERROR(); + + PyDict_SetItemString(result, "block", block_arr_.obj()); + PyDict_SetItemString(result, "timezone", py_tz); + PyDict_SetItemString(result, "placement", placement_arr_.obj()); + + *output = result; + + return Status::OK(); + } + + private: + std::string timezone_; +}; + template <int ARROW_INDEX_TYPE> class CategoricalBlock : public PandasBlock { public: @@ -1068,6 +1094,8 @@ static inline Status MakeCategoricalBlock(const std::shared_ptr<DataType>& type, return (*block)->Allocate(); } +using BlockMap = std::unordered_map<int, std::shared_ptr<PandasBlock>>; + // Construct the exact pandas 0.x "BlockManager" memory layout // // * For each column determine the correct output pandas type @@ -1138,9 +1166,14 @@ class DataFrameBlockCreator { case Type::DATE: output_type = PandasBlock::DATETIME; break; - case Type::TIMESTAMP: - output_type = PandasBlock::DATETIME; - break; + case Type::TIMESTAMP: { + const auto& ts_type = static_cast<const arrow::TimestampType&>(*col->type()); + if (ts_type.timezone != "") { + output_type = PandasBlock::DATETIME_WITH_TZ; + } else { + output_type = PandasBlock::DATETIME; + } + } break; case Type::LIST: { auto list_type = std::static_pointer_cast<ListType>(col->type()); if (!ListTypeSupported(list_type->value_type()->type)) { @@ -1159,10 +1192,15 @@ class DataFrameBlockCreator { } int block_placement = 0; + std::shared_ptr<PandasBlock> block; if (output_type == PandasBlock::CATEGORICAL) { - std::shared_ptr<PandasBlock> block; RETURN_NOT_OK(MakeCategoricalBlock(col->type(), table_->num_rows(), &block)); categorical_blocks_[i] = block; + } else if (output_type == PandasBlock::DATETIME_WITH_TZ) { + const auto& ts_type = static_cast<const arrow::TimestampType&>(*col->type()); + block = std::make_shared<DatetimeTZBlock>(ts_type.timezone, table_->num_rows()); + RETURN_NOT_OK(block->Allocate()); + datetimetz_blocks_[i] = block; } else { auto it = type_counts_.find(output_type); if (it != type_counts_.end()) { @@ -1252,28 +1290,24 @@ class DataFrameBlockCreator { return Status::OK(); } + Status AppendBlocks(const BlockMap& blocks, PyObject* list) { + for (const auto& it : blocks) { + PyObject* item; + RETURN_NOT_OK(it.second->GetPyResult(&item)); + if (PyList_Append(list, item) < 0) { RETURN_IF_PYERROR(); } + } + return Status::OK(); + } + Status GetResultList(PyObject** out) { PyAcquireGIL lock; - auto num_blocks = - static_cast<Py_ssize_t>(blocks_.size() + categorical_blocks_.size()); - PyObject* result = PyList_New(num_blocks); + PyObject* result = PyList_New(0); RETURN_IF_PYERROR(); - int i = 0; - for (const auto& it : blocks_) { - const std::shared_ptr<PandasBlock> block = it.second; - PyObject* item; - RETURN_NOT_OK(block->GetPyResult(&item)); - if (PyList_SET_ITEM(result, i++, item) < 0) { RETURN_IF_PYERROR(); } - } - - for (const auto& it : categorical_blocks_) { - const std::shared_ptr<PandasBlock> block = it.second; - PyObject* item; - RETURN_NOT_OK(block->GetPyResult(&item)); - if (PyList_SET_ITEM(result, i++, item) < 0) { RETURN_IF_PYERROR(); } - } + RETURN_NOT_OK(AppendBlocks(blocks_, result)); + RETURN_NOT_OK(AppendBlocks(categorical_blocks_, result)); + RETURN_NOT_OK(AppendBlocks(datetimetz_blocks_, result)); *out = result; return Status::OK(); @@ -1292,10 +1326,13 @@ class DataFrameBlockCreator { std::unordered_map<int, int> type_counts_; // block type -> block - std::unordered_map<int, std::shared_ptr<PandasBlock>> blocks_; + BlockMap blocks_; // column number -> categorical block - std::unordered_map<int, std::shared_ptr<PandasBlock>> categorical_blocks_; + BlockMap categorical_blocks_; + + // column number -> datetimetz block + BlockMap datetimetz_blocks_; }; Status ConvertTableToPandas(