Repository: arrow Updated Branches: refs/heads/master ef9083029 -> 2f52cf4ee
ARROW-215: Support other integer types and strings in Parquet I/O Change-Id: I72c6c82bc38c895a04172531bebbc78d4fb08732 Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/2f52cf4e Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/2f52cf4e Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/2f52cf4e Branch: refs/heads/master Commit: 2f52cf4eed1033d1bf1f043d9063e462e60d6605 Parents: ef90830 Author: Uwe L. Korn <uw...@xhochy.com> Authored: Sun Jun 12 11:48:10 2016 +0200 Committer: Uwe L. Korn <uw...@xhochy.com> Committed: Tue Jun 28 21:32:32 2016 +0200 ---------------------------------------------------------------------- cpp/src/arrow/parquet/parquet-io-test.cc | 461 ++++++++++++++-------- cpp/src/arrow/parquet/parquet-schema-test.cc | 4 +- cpp/src/arrow/parquet/reader.cc | 160 +++++++- cpp/src/arrow/parquet/schema.cc | 47 ++- cpp/src/arrow/parquet/schema.h | 9 +- cpp/src/arrow/parquet/test-util.h | 136 ++++++- cpp/src/arrow/parquet/writer.cc | 234 +++++++++-- cpp/src/arrow/parquet/writer.h | 9 +- cpp/src/arrow/test-util.h | 2 + cpp/src/arrow/types/primitive.cc | 5 + python/pyarrow/includes/parquet.pxd | 13 +- python/pyarrow/parquet.pyx | 22 +- python/pyarrow/tests/test_parquet.py | 43 +- 13 files changed, 901 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/parquet-io-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc index edcac88..572cae1 100644 --- a/cpp/src/arrow/parquet/parquet-io-test.cc +++ b/cpp/src/arrow/parquet/parquet-io-test.cc @@ -21,7 +21,9 @@ #include "arrow/parquet/test-util.h" #include "arrow/parquet/reader.h" #include "arrow/parquet/writer.h" +#include "arrow/types/construct.h" #include "arrow/types/primitive.h" +#include "arrow/types/string.h" #include "arrow/util/memory-pool.h" #include "arrow/util/status.h" @@ -30,12 +32,15 @@ using ParquetBuffer = parquet::Buffer; using parquet::BufferReader; +using parquet::default_writer_properties; using parquet::InMemoryOutputStream; +using parquet::LogicalType; using parquet::ParquetFileReader; using parquet::ParquetFileWriter; using parquet::RandomAccessSource; using parquet::Repetition; using parquet::SchemaDescriptor; +using parquet::ParquetVersion; using ParquetType = parquet::Type; using parquet::schema::GroupNode; using parquet::schema::NodePtr; @@ -52,25 +57,113 @@ template <typename TestType> struct test_traits {}; template <> +struct test_traits<BooleanType> { + static constexpr ParquetType::type parquet_enum = ParquetType::BOOLEAN; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static uint8_t const value; +}; + +const uint8_t test_traits<BooleanType>::value(1); + +template <> +struct test_traits<UInt8Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_8; + static uint8_t const value; +}; + +const uint8_t test_traits<UInt8Type>::value(64); + +template <> +struct test_traits<Int8Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::INT_8; + static int8_t const value; +}; + +const int8_t test_traits<Int8Type>::value(-64); + +template <> +struct test_traits<UInt16Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_16; + static uint16_t const value; +}; + +const uint16_t test_traits<UInt16Type>::value(1024); + +template <> +struct test_traits<Int16Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::INT_16; + static int16_t const value; +}; + +const int16_t test_traits<Int16Type>::value(-1024); + +template <> +struct test_traits<UInt32Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_32; + static uint32_t const value; +}; + +const uint32_t test_traits<UInt32Type>::value(1024); + +template <> struct test_traits<Int32Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT32; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static int32_t const value; +}; + +const int32_t test_traits<Int32Type>::value(-1024); + +template <> +struct test_traits<UInt64Type> { + static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::UINT_64; + static uint64_t const value; }; +const uint64_t test_traits<UInt64Type>::value(1024); + template <> struct test_traits<Int64Type> { static constexpr ParquetType::type parquet_enum = ParquetType::INT64; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static int64_t const value; }; +const int64_t test_traits<Int64Type>::value(-1024); + template <> struct test_traits<FloatType> { static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static float const value; }; +const float test_traits<FloatType>::value(2.1f); + template <> struct test_traits<DoubleType> { static constexpr ParquetType::type parquet_enum = ParquetType::DOUBLE; + static constexpr LogicalType::type logical_enum = LogicalType::NONE; + static double const value; +}; + +const double test_traits<DoubleType>::value(4.2); + +template <> +struct test_traits<StringType> { + static constexpr ParquetType::type parquet_enum = ParquetType::BYTE_ARRAY; + static constexpr LogicalType::type logical_enum = LogicalType::UTF8; + static std::string const value; }; +const std::string test_traits<StringType>::value("Test"); + template <typename T> using ParquetDataType = ::parquet::DataType<test_traits<T>::parquet_enum>; @@ -80,18 +173,18 @@ using ParquetWriter = ::parquet::TypedColumnWriter<ParquetDataType<T>>; template <typename TestType> class TestParquetIO : public ::testing::Test { public: - typedef typename TestType::c_type T; virtual void SetUp() {} - std::shared_ptr<GroupNode> MakeSchema( - ParquetType::type parquet_type, Repetition::type repetition) { - auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type); + std::shared_ptr<GroupNode> MakeSchema(Repetition::type repetition) { + auto pnode = PrimitiveNode::Make("column1", repetition, + test_traits<TestType>::parquet_enum, test_traits<TestType>::logical_enum); NodePtr node_ = GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode})); return std::static_pointer_cast<GroupNode>(node_); } - std::unique_ptr<ParquetFileWriter> MakeWriter(std::shared_ptr<GroupNode>& schema) { + std::unique_ptr<ParquetFileWriter> MakeWriter( + const std::shared_ptr<GroupNode>& schema) { sink_ = std::make_shared<InMemoryOutputStream>(); return ParquetFileWriter::Open(sink_, schema); } @@ -106,113 +199,74 @@ class TestParquetIO : public ::testing::Test { std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>* out) { arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader; - ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader))); + ASSERT_OK_NO_THROW(reader.GetFlatColumn(0, &column_reader)); ASSERT_NE(nullptr, column_reader.get()); + ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out)); ASSERT_NE(nullptr, out->get()); } + void ReadAndCheckSingleColumnFile(Array* values) { + std::shared_ptr<Array> out; + ReadSingleColumnFile(ReaderFromSink(), &out); + ASSERT_TRUE(values->Equals(out)); + } + void ReadTableFromFile( std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Table>* out) { arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader)); - ASSERT_NO_THROW(ASSERT_OK(reader.ReadFlatTable(out))); + ASSERT_OK_NO_THROW(reader.ReadFlatTable(out)); ASSERT_NE(nullptr, out->get()); } - std::unique_ptr<ParquetFileReader> TestFile(std::vector<T>& values, int num_chunks) { - std::shared_ptr<GroupNode> schema = - MakeSchema(test_traits<TestType>::parquet_enum, Repetition::REQUIRED); - std::unique_ptr<ParquetFileWriter> file_writer = MakeWriter(schema); - size_t chunk_size = values.size() / num_chunks; - for (int i = 0; i < num_chunks; i++) { - auto row_group_writer = file_writer->AppendRowGroup(chunk_size); - auto column_writer = - static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn()); - T* data = values.data() + i * chunk_size; - column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); - column_writer->Close(); - row_group_writer->Close(); - } - file_writer->Close(); - return ReaderFromSink(); + void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) { + std::shared_ptr<Table> out; + ReadTableFromFile(ReaderFromSink(), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(values->length(), out->num_rows()); + + std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); + } + + template <typename ArrayType> + void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema, + const std::shared_ptr<ArrayType>& values) { + FileWriter writer(default_memory_pool(), MakeWriter(schema)); + ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length())); + ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get())); + ASSERT_OK_NO_THROW(writer.Close()); } std::shared_ptr<InMemoryOutputStream> sink_; }; -typedef ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType> TestTypes; - -TYPED_TEST_CASE(TestParquetIO, TestTypes); - -TYPED_TEST(TestParquetIO, SingleColumnRequiredRead) { - std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128); - std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1); - - std::shared_ptr<Array> out; - this->ReadSingleColumnFile(std::move(file_reader), &out); - - ExpectArray<typename TypeParam::c_type>(values.data(), out.get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnRequiredTableRead) { - std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128); - std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 1); - - std::shared_ptr<Table> out; - this->ReadTableFromFile(std::move(file_reader), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(SMALL_SIZE, out->num_rows()); - - std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedRead) { - std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128); - std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4); - - std::shared_ptr<Array> out; - this->ReadSingleColumnFile(std::move(file_reader), &out); +// We habe separate tests for UInt32Type as this is currently the only type +// where a roundtrip does not yield the identical Array structure. +// There we write an UInt32 Array but receive an Int64 Array as result for +// Parquet version 1.0. - ExpectArray<typename TypeParam::c_type>(values.data(), out.get()); -} - -TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedTableRead) { - std::vector<typename TypeParam::c_type> values(SMALL_SIZE, 128); - std::unique_ptr<ParquetFileReader> file_reader = this->TestFile(values, 4); - - std::shared_ptr<Table> out; - this->ReadTableFromFile(std::move(file_reader), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(SMALL_SIZE, out->num_rows()); +typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type, + Int32Type, UInt64Type, Int64Type, FloatType, DoubleType, StringType> TestTypes; - std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ExpectArray<typename TypeParam::c_type>(values.data(), chunked_array->chunk(0).get()); -} +TYPED_TEST_CASE(TestParquetIO, TestTypes); TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) { - std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128); + auto values = NonNullArray<TypeParam>(SMALL_SIZE); - std::shared_ptr<GroupNode> schema = - this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED); - FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); - ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); - ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); - ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED); + this->WriteFlatColumn(schema, values); - std::shared_ptr<Array> out; - this->ReadSingleColumnFile(this->ReaderFromSink(), &out); - ASSERT_TRUE(values->Equals(out)); + this->ReadAndCheckSingleColumnFile(values.get()); } TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { - std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128); + auto values = NonNullArray<TypeParam>(SMALL_SIZE); std::shared_ptr<Table> table = MakeSimpleTable(values, false); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_NO_THROW(ASSERT_OK( - WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length()))); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); std::shared_ptr<Table> out; this->ReadTableFromFile(this->ReaderFromSink(), &out); @@ -226,113 +280,208 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) { TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) { // This also tests max_definition_level = 1 - std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10); + auto values = NullableArray<TypeParam>(SMALL_SIZE, 10); - std::shared_ptr<GroupNode> schema = - this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL); - FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); - ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length()))); - ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get()))); - ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL); + this->WriteFlatColumn(schema, values); - std::shared_ptr<Array> out; - this->ReadSingleColumnFile(this->ReaderFromSink(), &out); - ASSERT_TRUE(values->Equals(out)); + this->ReadAndCheckSingleColumnFile(values.get()); } TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { // This also tests max_definition_level = 1 - std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10); + std::shared_ptr<Array> values = NullableArray<TypeParam>(SMALL_SIZE, 10); std::shared_ptr<Table> table = MakeSimpleTable(values, true); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_NO_THROW(ASSERT_OK( - WriteFlatTable(table.get(), default_memory_pool(), this->sink_, values->length()))); - - std::shared_ptr<Table> out; - this->ReadTableFromFile(this->ReaderFromSink(), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(SMALL_SIZE, out->num_rows()); + ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, + values->length(), default_writer_properties())); - std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); + this->ReadAndCheckSingleColumnTable(values); } -TYPED_TEST(TestParquetIO, SingleColumnIntRequiredChunkedWrite) { - std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(SMALL_SIZE, 128); - std::shared_ptr<PrimitiveArray> values_chunk = - NonNullArray<TypeParam>(SMALL_SIZE / 4, 128); +TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { + auto values = NonNullArray<TypeParam>(SMALL_SIZE); + int64_t chunk_size = values->length() / 4; - std::shared_ptr<GroupNode> schema = - this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::REQUIRED); + std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED); FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); for (int i = 0; i < 4; i++) { - ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length()))); - ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get()))); + ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW( + writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); } - ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + ASSERT_OK_NO_THROW(writer.Close()); - std::shared_ptr<Array> out; - this->ReadSingleColumnFile(this->ReaderFromSink(), &out); - ASSERT_TRUE(values->Equals(out)); + this->ReadAndCheckSingleColumnFile(values.get()); } TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) { - std::shared_ptr<PrimitiveArray> values = NonNullArray<TypeParam>(LARGE_SIZE, 128); + auto values = NonNullArray<TypeParam>(LARGE_SIZE); std::shared_ptr<Table> table = MakeSimpleTable(values, false); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_NO_THROW( - ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512))); - - std::shared_ptr<Table> out; - this->ReadTableFromFile(this->ReaderFromSink(), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(LARGE_SIZE, out->num_rows()); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); - std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); + this->ReadAndCheckSingleColumnTable(values); } TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) { - std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(SMALL_SIZE, 128, 10); - std::shared_ptr<PrimitiveArray> values_chunk_nulls = - NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 10); - std::shared_ptr<PrimitiveArray> values_chunk = - NullableArray<TypeParam>(SMALL_SIZE / 4, 128, 0); - - std::shared_ptr<GroupNode> schema = - this->MakeSchema(test_traits<TypeParam>::parquet_enum, Repetition::OPTIONAL); + int64_t chunk_size = SMALL_SIZE / 4; + auto values = NullableArray<TypeParam>(SMALL_SIZE, 10); + + std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL); FileWriter writer(default_memory_pool(), this->MakeWriter(schema)); - ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length()))); - ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get()))); - for (int i = 0; i < 3; i++) { - ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length()))); - ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get()))); + for (int i = 0; i < 4; i++) { + ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size)); + ASSERT_OK_NO_THROW( + writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size)); } - ASSERT_NO_THROW(ASSERT_OK(writer.Close())); + ASSERT_OK_NO_THROW(writer.Close()); - std::shared_ptr<Array> out; - this->ReadSingleColumnFile(this->ReaderFromSink(), &out); - ASSERT_TRUE(values->Equals(out)); + this->ReadAndCheckSingleColumnFile(values.get()); } TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) { // This also tests max_definition_level = 1 - std::shared_ptr<PrimitiveArray> values = NullableArray<TypeParam>(LARGE_SIZE, 128, 100); + auto values = NullableArray<TypeParam>(LARGE_SIZE, 100); std::shared_ptr<Table> table = MakeSimpleTable(values, true); this->sink_ = std::make_shared<InMemoryOutputStream>(); - ASSERT_NO_THROW( - ASSERT_OK(WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512))); + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties())); - std::shared_ptr<Table> out; - this->ReadTableFromFile(this->ReaderFromSink(), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(LARGE_SIZE, out->num_rows()); + this->ReadAndCheckSingleColumnTable(values); +} - std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); +using TestUInt32ParquetIO = TestParquetIO<UInt32Type>; + +TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) { + // This also tests max_definition_level = 1 + std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100); + std::shared_ptr<Table> table = MakeSimpleTable(values, true); + + // Parquet 2.0 roundtrip should yield an uint32_t column again + this->sink_ = std::make_shared<InMemoryOutputStream>(); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_2_0) + ->build(); + ASSERT_OK_NO_THROW( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); + this->ReadAndCheckSingleColumnTable(values); +} + +TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) { + // This also tests max_definition_level = 1 + std::shared_ptr<PrimitiveArray> values = NullableArray<UInt32Type>(LARGE_SIZE, 100); + std::shared_ptr<Table> table = MakeSimpleTable(values, true); + + // Parquet 1.0 returns an int64_t column as there is no way to tell a Parquet 1.0 + // reader that a column is unsigned. + this->sink_ = std::make_shared<InMemoryOutputStream>(); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::WriterProperties::Builder() + .version(ParquetVersion::PARQUET_1_0) + ->build(); + ASSERT_OK_NO_THROW( + WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties)); + + std::shared_ptr<Array> expected_values; + std::shared_ptr<PoolBuffer> int64_data = + std::make_shared<PoolBuffer>(default_memory_pool()); + { + ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length())); + int64_t* int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data()); + const uint32_t* uint32_data_ptr = + reinterpret_cast<const uint32_t*>(values->data()->data()); + // std::copy might be faster but this is explicit on the casts) + for (int64_t i = 0; i < values->length(); i++) { + int64_data_ptr[i] = static_cast<int64_t>(uint32_data_ptr[i]); + } + } + ASSERT_OK(MakePrimitiveArray(std::make_shared<Int64Type>(), values->length(), + int64_data, values->null_count(), values->null_bitmap(), &expected_values)); + this->ReadAndCheckSingleColumnTable(expected_values); +} + +template <typename T> +using ParquetCDataType = typename ParquetDataType<T>::c_type; + +template <typename TestType> +class TestPrimitiveParquetIO : public TestParquetIO<TestType> { + public: + typedef typename TestType::c_type T; + + void TestFile(std::vector<T>& values, int num_chunks, + std::unique_ptr<ParquetFileReader>* file_reader) { + std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED); + std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema); + size_t chunk_size = values.size() / num_chunks; + // Convert to Parquet's expected physical type + std::vector<uint8_t> values_buffer( + sizeof(ParquetCDataType<TestType>) * values.size()); + auto values_parquet = + reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data()); + std::copy(values.cbegin(), values.cend(), values_parquet); + for (int i = 0; i < num_chunks; i++) { + auto row_group_writer = file_writer->AppendRowGroup(chunk_size); + auto column_writer = + static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn()); + ParquetCDataType<TestType>* data = values_parquet + i * chunk_size; + column_writer->WriteBatch(chunk_size, nullptr, nullptr, data); + column_writer->Close(); + row_group_writer->Close(); + } + file_writer->Close(); + *file_reader = this->ReaderFromSink(); + } + + void TestSingleColumnRequiredTableRead(int num_chunks) { + std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value); + std::unique_ptr<ParquetFileReader> file_reader; + ASSERT_NO_THROW(TestFile(values, num_chunks, &file_reader)); + + std::shared_ptr<Table> out; + this->ReadTableFromFile(std::move(file_reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(SMALL_SIZE, out->num_rows()); + + std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ExpectArray<TestType>(values.data(), chunked_array->chunk(0).get()); + } + + void TestSingleColumnRequiredRead(int num_chunks) { + std::vector<T> values(SMALL_SIZE, test_traits<TestType>::value); + std::unique_ptr<ParquetFileReader> file_reader; + ASSERT_NO_THROW(TestFile(values, num_chunks, &file_reader)); + + std::shared_ptr<Array> out; + this->ReadSingleColumnFile(std::move(file_reader), &out); + + ExpectArray<TestType>(values.data(), out.get()); + } +}; + +typedef ::testing::Types<BooleanType, UInt8Type, Int8Type, UInt16Type, Int16Type, + UInt32Type, Int32Type, UInt64Type, Int64Type, FloatType, + DoubleType> PrimitiveTestTypes; + +TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredRead) { + this->TestSingleColumnRequiredRead(1); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredTableRead) { + this->TestSingleColumnRequiredTableRead(1); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedRead) { + this->TestSingleColumnRequiredRead(4); +} + +TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { + this->TestSingleColumnRequiredTableRead(4); } } // namespace parquet http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/parquet-schema-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/parquet-schema-test.cc b/cpp/src/arrow/parquet/parquet-schema-test.cc index 8de7394..819cdd3 100644 --- a/cpp/src/arrow/parquet/parquet-schema-test.cc +++ b/cpp/src/arrow/parquet/parquet-schema-test.cc @@ -183,7 +183,9 @@ class TestConvertArrowSchema : public ::testing::Test { Status ConvertSchema(const std::vector<std::shared_ptr<Field>>& fields) { arrow_schema_ = std::make_shared<Schema>(fields); - return ToParquetSchema(arrow_schema_.get(), &result_schema_); + std::shared_ptr<::parquet::WriterProperties> properties = + ::parquet::default_writer_properties(); + return ToParquetSchema(arrow_schema_.get(), *properties.get(), &result_schema_); } protected: http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/reader.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc index 3b4882d..7b05665 100644 --- a/cpp/src/arrow/parquet/reader.cc +++ b/cpp/src/arrow/parquet/reader.cc @@ -17,6 +17,7 @@ #include "arrow/parquet/reader.h" +#include <algorithm> #include <queue> #include <string> #include <vector> @@ -27,6 +28,7 @@ #include "arrow/schema.h" #include "arrow/table.h" #include "arrow/types/primitive.h" +#include "arrow/types/string.h" #include "arrow/util/status.h" using parquet::ColumnReader; @@ -36,6 +38,19 @@ using parquet::TypedColumnReader; namespace arrow { namespace parquet { +template <typename ArrowType> +struct ArrowTypeTraits { + typedef NumericBuilder<ArrowType> builder_type; +}; + +template <> +struct ArrowTypeTraits<BooleanType> { + typedef BooleanBuilder builder_type; +}; + +template <typename ArrowType> +using BuilderType = typename ArrowTypeTraits<ArrowType>::builder_type; + class FileReader::Impl { public: Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader); @@ -61,9 +76,45 @@ class FlatColumnReader::Impl { template <typename ArrowType, typename ParquetType> Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out); + template <typename ArrowType, typename ParquetType> + Status ReadNullableFlatBatch(const int16_t* def_levels, + typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read, + BuilderType<ArrowType>* builder); + template <typename ArrowType, typename ParquetType> + Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read, + BuilderType<ArrowType>* builder); + private: void NextRowGroup(); + template <typename InType, typename OutType> + struct can_copy_ptr { + static constexpr bool value = + std::is_same<InType, OutType>::value || + (std::is_integral<InType>{} && std::is_integral<OutType>{} && + (sizeof(InType) == sizeof(OutType))); + }; + + template <typename InType, typename OutType, + typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + *out_ptr = reinterpret_cast<const OutType*>(in_ptr); + return Status::OK(); + } + + template <typename InType, typename OutType, + typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType))); + OutType* mutable_out_ptr = + reinterpret_cast<OutType*>(values_builder_buffer_.mutable_data()); + std::copy(in_ptr, in_ptr + length, mutable_out_ptr); + *out_ptr = mutable_out_ptr; + return Status::OK(); + } + MemoryPool* pool_; const ::parquet::ColumnDescriptor* descr_; ::parquet::ParquetFileReader* reader_; @@ -156,12 +207,52 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor } template <typename ArrowType, typename ParquetType> +Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values, + int64_t values_read, BuilderType<ArrowType>* builder) { + using ArrowCType = typename ArrowType::c_type; + using ParquetCType = typename ParquetType::c_type; + + DCHECK(builder); + const ArrowCType* values_ptr; + RETURN_NOT_OK( + (ConvertPhysicalType<ParquetCType, ArrowCType>(values, values_read, &values_ptr))); + RETURN_NOT_OK(builder->Append(values_ptr, values_read)); + return Status::OK(); +} + +template <typename ArrowType, typename ParquetType> +Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels, + typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read, + BuilderType<ArrowType>* builder) { + using ArrowCType = typename ArrowType::c_type; + + DCHECK(builder); + RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType))); + RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t))); + auto values_ptr = reinterpret_cast<ArrowCType*>(values_builder_buffer_.mutable_data()); + uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data(); + int values_idx = 0; + for (int64_t i = 0; i < levels_read; i++) { + if (def_levels[i] < descr_->max_definition_level()) { + valid_bytes[i] = 0; + } else { + valid_bytes[i] = 1; + values_ptr[i] = values[values_idx++]; + } + } + RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes)); + return Status::OK(); +} + +template <typename ArrowType, typename ParquetType> Status FlatColumnReader::Impl::TypedReadBatch( int batch_size, std::shared_ptr<Array>* out) { + using ParquetCType = typename ParquetType::c_type; + int values_to_read = batch_size; - NumericBuilder<ArrowType> builder(pool_, field_->type); + BuilderType<ArrowType> builder(pool_, field_->type); while ((values_to_read > 0) && column_reader_) { - values_buffer_.Resize(values_to_read * sizeof(typename ParquetType::c_type)); + values_buffer_.Resize(values_to_read * sizeof(ParquetCType)); if (descr_->max_definition_level() > 0) { def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); } @@ -169,31 +260,62 @@ Status FlatColumnReader::Impl::TypedReadBatch( int64_t values_read; int64_t levels_read; int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); - auto values = - reinterpret_cast<typename ParquetType::c_type*>(values_buffer_.mutable_data()); + auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data()); PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( values_to_read, def_levels, nullptr, values, &values_read)); values_to_read -= levels_read; if (descr_->max_definition_level() == 0) { - RETURN_NOT_OK(builder.Append(values, values_read)); + RETURN_NOT_OK( + (ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read, &builder))); + } else { + // As per the defintion and checks for flat columns: + // descr_->max_definition_level() == 1 + RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>( + def_levels, values, values_read, levels_read, &builder))); + } + if (!column_reader_->HasNext()) { NextRowGroup(); } + } + *out = builder.Finish(); + return Status::OK(); +} + +template <> +Status FlatColumnReader::Impl::TypedReadBatch<StringType, ::parquet::ByteArrayType>( + int batch_size, std::shared_ptr<Array>* out) { + int values_to_read = batch_size; + StringBuilder builder(pool_, field_->type); + while ((values_to_read > 0) && column_reader_) { + values_buffer_.Resize(values_to_read * sizeof(::parquet::ByteArray)); + if (descr_->max_definition_level() > 0) { + def_levels_buffer_.Resize(values_to_read * sizeof(int16_t)); + } + auto reader = + dynamic_cast<TypedColumnReader<::parquet::ByteArrayType>*>(column_reader_.get()); + int64_t values_read; + int64_t levels_read; + int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); + auto values = reinterpret_cast<::parquet::ByteArray*>(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch( + values_to_read, def_levels, nullptr, values, &values_read)); + values_to_read -= levels_read; + if (descr_->max_definition_level() == 0) { + for (int64_t i = 0; i < levels_read; i++) { + RETURN_NOT_OK( + builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len)); + } } else { // descr_->max_definition_level() == 1 - RETURN_NOT_OK(values_builder_buffer_.Resize( - levels_read * sizeof(typename ParquetType::c_type))); - RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t))); - auto values_ptr = reinterpret_cast<typename ParquetType::c_type*>( - values_builder_buffer_.mutable_data()); - uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data(); int values_idx = 0; for (int64_t i = 0; i < levels_read; i++) { if (def_levels[i] < descr_->max_definition_level()) { - valid_bytes[i] = 0; + RETURN_NOT_OK(builder.AppendNull()); } else { - valid_bytes[i] = 1; - values_ptr[i] = values[values_idx++]; + RETURN_NOT_OK( + builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr), + values[values_idx].len)); + values_idx++; } } - builder.Append(values_ptr, levels_read, valid_bytes); } if (!column_reader_->HasNext()) { NextRowGroup(); } } @@ -214,10 +336,18 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* } switch (field_->type->type) { + TYPED_BATCH_CASE(BOOL, BooleanType, ::parquet::BooleanType) + TYPED_BATCH_CASE(UINT8, UInt8Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT8, Int8Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(UINT16, UInt16Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT16, Int16Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(UINT32, UInt32Type, ::parquet::Int32Type) TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(UINT64, UInt64Type, ::parquet::Int64Type) TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type) TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType) TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType) + TYPED_BATCH_CASE(STRING, StringType, ::parquet::ByteArrayType) default: return Status::NotImplemented(field_->type->ToString()); } http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/schema.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/schema.cc b/cpp/src/arrow/parquet/schema.cc index c7979db..a79342a 100644 --- a/cpp/src/arrow/parquet/schema.cc +++ b/cpp/src/arrow/parquet/schema.cc @@ -42,7 +42,12 @@ namespace parquet { const auto BOOL = std::make_shared<BooleanType>(); const auto UINT8 = std::make_shared<UInt8Type>(); +const auto INT8 = std::make_shared<Int8Type>(); +const auto UINT16 = std::make_shared<UInt16Type>(); +const auto INT16 = std::make_shared<Int16Type>(); +const auto UINT32 = std::make_shared<UInt32Type>(); const auto INT32 = std::make_shared<Int32Type>(); +const auto UINT64 = std::make_shared<UInt64Type>(); const auto INT64 = std::make_shared<Int64Type>(); const auto FLOAT = std::make_shared<FloatType>(); const auto DOUBLE = std::make_shared<DoubleType>(); @@ -92,6 +97,21 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) { case LogicalType::NONE: *out = INT32; break; + case LogicalType::UINT_8: + *out = UINT8; + break; + case LogicalType::INT_8: + *out = INT8; + break; + case LogicalType::UINT_16: + *out = UINT16; + break; + case LogicalType::INT_16: + *out = INT16; + break; + case LogicalType::UINT_32: + *out = UINT32; + break; case LogicalType::DECIMAL: *out = MakeDecimalType(node); break; @@ -107,6 +127,9 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) { case LogicalType::NONE: *out = INT64; break; + case LogicalType::UINT_64: + *out = UINT64; + break; case LogicalType::DECIMAL: *out = MakeDecimalType(node); break; @@ -187,20 +210,21 @@ Status FromParquetSchema( } Status StructToNode(const std::shared_ptr<StructType>& type, const std::string& name, - bool nullable, NodePtr* out) { + bool nullable, const ::parquet::WriterProperties& properties, NodePtr* out) { Repetition::type repetition = Repetition::REQUIRED; if (nullable) { repetition = Repetition::OPTIONAL; } std::vector<NodePtr> children(type->num_children()); for (int i = 0; i < type->num_children(); i++) { - RETURN_NOT_OK(FieldToNode(type->child(i), &children[i])); + RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i])); } *out = GroupNode::Make(name, repetition, children); return Status::OK(); } -Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) { +Status FieldToNode(const std::shared_ptr<Field>& field, + const ::parquet::WriterProperties& properties, NodePtr* out) { LogicalType::type logical_type = LogicalType::NONE; ParquetType::type type; Repetition::type repetition = Repetition::REQUIRED; @@ -231,8 +255,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) { logical_type = LogicalType::INT_16; break; case Type::UINT32: - type = ParquetType::INT32; - logical_type = LogicalType::UINT_32; + if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) { + type = ParquetType::INT64; + } else { + type = ParquetType::INT32; + logical_type = LogicalType::UINT_32; + } break; case Type::INT32: type = ParquetType::INT32; @@ -277,7 +305,7 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) { break; case Type::STRUCT: { auto struct_type = std::static_pointer_cast<StructType>(field->type); - return StructToNode(struct_type, field->name, field->nullable, out); + return StructToNode(struct_type, field->name, field->nullable, properties, out); } break; default: // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR @@ -287,11 +315,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field, NodePtr* out) { return Status::OK(); } -Status ToParquetSchema( - const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out) { +Status ToParquetSchema(const Schema* arrow_schema, + const ::parquet::WriterProperties& properties, + std::shared_ptr<::parquet::SchemaDescriptor>* out) { std::vector<NodePtr> nodes(arrow_schema->num_fields()); for (int i = 0; i < arrow_schema->num_fields(); i++) { - RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), &nodes[i])); + RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i])); } NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes); http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/schema.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/schema.h b/cpp/src/arrow/parquet/schema.h index ec5f960..39bee05 100644 --- a/cpp/src/arrow/parquet/schema.h +++ b/cpp/src/arrow/parquet/schema.h @@ -21,6 +21,7 @@ #include <memory> #include "parquet/api/schema.h" +#include "parquet/api/writer.h" #include "arrow/schema.h" #include "arrow/type.h" @@ -36,10 +37,12 @@ Status NodeToField(const ::parquet::schema::NodePtr& node, std::shared_ptr<Field Status FromParquetSchema( const ::parquet::SchemaDescriptor* parquet_schema, std::shared_ptr<Schema>* out); -Status FieldToNode(const std::shared_ptr<Field>& field, ::parquet::schema::NodePtr* out); +Status FieldToNode(const std::shared_ptr<Field>& field, + const ::parquet::WriterProperties& properties, ::parquet::schema::NodePtr* out); -Status ToParquetSchema( - const Schema* arrow_schema, std::shared_ptr<::parquet::SchemaDescriptor>* out); +Status ToParquetSchema(const Schema* arrow_schema, + const ::parquet::WriterProperties& properties, + std::shared_ptr<::parquet::SchemaDescriptor>* out); } // namespace parquet http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/test-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/test-util.h b/cpp/src/arrow/parquet/test-util.h index cc8723b..68a7fb9 100644 --- a/cpp/src/arrow/parquet/test-util.h +++ b/cpp/src/arrow/parquet/test-util.h @@ -18,26 +18,90 @@ #include <string> #include <vector> +#include "arrow/test-util.h" #include "arrow/types/primitive.h" +#include "arrow/types/string.h" namespace arrow { namespace parquet { template <typename ArrowType> -std::shared_ptr<PrimitiveArray> NonNullArray( - size_t size, typename ArrowType::c_type value) { - std::vector<typename ArrowType::c_type> values(size, value); +using is_arrow_float = std::is_floating_point<typename ArrowType::c_type>; + +template <typename ArrowType> +using is_arrow_int = std::is_integral<typename ArrowType::c_type>; + +template <typename ArrowType> +using is_arrow_string = std::is_same<ArrowType, StringType>; + +template <class ArrowType> +typename std::enable_if<is_arrow_float<ArrowType>::value, + std::shared_ptr<PrimitiveArray>>::type +NonNullArray(size_t size) { + std::vector<typename ArrowType::c_type> values; + ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values); NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>()); builder.Append(values.data(), values.size()); return std::static_pointer_cast<PrimitiveArray>(builder.Finish()); } -// This helper function only supports (size/2) nulls yet. +template <class ArrowType> +typename std::enable_if<is_arrow_int<ArrowType>::value, + std::shared_ptr<PrimitiveArray>>::type +NonNullArray(size_t size) { + std::vector<typename ArrowType::c_type> values; + ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values); + NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast<PrimitiveArray>(builder.Finish()); +} + +template <class ArrowType> +typename std::enable_if<is_arrow_string<ArrowType>::value, + std::shared_ptr<StringArray>>::type +NonNullArray(size_t size) { + StringBuilder builder(default_memory_pool(), std::make_shared<StringType>()); + for (size_t i = 0; i < size; i++) { + builder.Append("test-string"); + } + return std::static_pointer_cast<StringArray>(builder.Finish()); +} + +template <> +std::shared_ptr<PrimitiveArray> NonNullArray<BooleanType>(size_t size) { + std::vector<uint8_t> values; + ::arrow::test::randint<uint8_t>(size, 0, 1, &values); + BooleanBuilder builder(default_memory_pool(), std::make_shared<BooleanType>()); + builder.Append(values.data(), values.size()); + return std::static_pointer_cast<PrimitiveArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls. +template <typename ArrowType> +typename std::enable_if<is_arrow_float<ArrowType>::value, + std::shared_ptr<PrimitiveArray>>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector<typename ArrowType::c_type> values; + ::arrow::test::random_real<typename ArrowType::c_type>(size, 0, 0, 1, &values); + std::vector<uint8_t> valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast<PrimitiveArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls. template <typename ArrowType> -std::shared_ptr<PrimitiveArray> NullableArray( - size_t size, typename ArrowType::c_type value, size_t num_nulls) { - std::vector<typename ArrowType::c_type> values(size, value); +typename std::enable_if<is_arrow_int<ArrowType>::value, + std::shared_ptr<PrimitiveArray>>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector<typename ArrowType::c_type> values; + ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values); std::vector<uint8_t> valid_bytes(size, 1); for (size_t i = 0; i < num_nulls; i++) { @@ -49,14 +113,49 @@ std::shared_ptr<PrimitiveArray> NullableArray( return std::static_pointer_cast<PrimitiveArray>(builder.Finish()); } -std::shared_ptr<Column> MakeColumn(const std::string& name, - const std::shared_ptr<PrimitiveArray>& array, bool nullable) { +// This helper function only supports (size/2) nulls yet. +template <typename ArrowType> +typename std::enable_if<is_arrow_string<ArrowType>::value, + std::shared_ptr<StringArray>>::type +NullableArray(size_t size, size_t num_nulls) { + std::vector<uint8_t> valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + StringBuilder builder(default_memory_pool(), std::make_shared<StringType>()); + for (size_t i = 0; i < size; i++) { + builder.Append("test-string"); + } + return std::static_pointer_cast<StringArray>(builder.Finish()); +} + +// This helper function only supports (size/2) nulls yet. +template <> +std::shared_ptr<PrimitiveArray> NullableArray<BooleanType>( + size_t size, size_t num_nulls) { + std::vector<uint8_t> values; + ::arrow::test::randint<uint8_t>(size, 0, 1, &values); + std::vector<uint8_t> valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + BooleanBuilder builder(default_memory_pool(), std::make_shared<BooleanType>()); + builder.Append(values.data(), values.size(), valid_bytes.data()); + return std::static_pointer_cast<PrimitiveArray>(builder.Finish()); +} + +std::shared_ptr<Column> MakeColumn( + const std::string& name, const std::shared_ptr<Array>& array, bool nullable) { auto field = std::make_shared<Field>(name, array->type(), nullable); return std::make_shared<Column>(field, array); } std::shared_ptr<Table> MakeSimpleTable( - const std::shared_ptr<PrimitiveArray>& values, bool nullable) { + const std::shared_ptr<Array>& values, bool nullable) { std::shared_ptr<Column> column = MakeColumn("col", values, nullable); std::vector<std::shared_ptr<Column>> columns({column}); std::vector<std::shared_ptr<Field>> fields({column->field()}); @@ -72,6 +171,23 @@ void ExpectArray(T* expected, Array* result) { } } +template <typename ArrowType> +void ExpectArray(typename ArrowType::c_type* expected, Array* result) { + PrimitiveArray* p_array = static_cast<PrimitiveArray*>(result); + for (int64_t i = 0; i < result->length(); i++) { + EXPECT_EQ(expected[i], + reinterpret_cast<const typename ArrowType::c_type*>(p_array->data()->data())[i]); + } +} + +template <> +void ExpectArray<BooleanType>(uint8_t* expected, Array* result) { + BooleanBuilder builder(default_memory_pool(), std::make_shared<BooleanType>()); + builder.Append(expected, result->length()); + std::shared_ptr<Array> expected_array = builder.Finish(); + EXPECT_TRUE(result->Equals(expected_array)); +} + } // namespace parquet } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/writer.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc index 4005e3b..63449bb 100644 --- a/cpp/src/arrow/parquet/writer.cc +++ b/cpp/src/arrow/parquet/writer.cc @@ -25,11 +25,13 @@ #include "arrow/table.h" #include "arrow/types/construct.h" #include "arrow/types/primitive.h" +#include "arrow/types/string.h" #include "arrow/parquet/schema.h" #include "arrow/parquet/utils.h" #include "arrow/util/status.h" using parquet::ParquetFileWriter; +using parquet::ParquetVersion; using parquet::schema::GroupNode; namespace arrow { @@ -41,10 +43,40 @@ class FileWriter::Impl { Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); Status NewRowGroup(int64_t chunk_size); - template <typename ParquetType> + template <typename ParquetType, typename ArrowType> Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data, int64_t offset, int64_t length); + + // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary + // buffer + template <typename InType, typename OutType> + struct can_copy_ptr { + static constexpr bool value = + std::is_same<InType, OutType>::value || + (std::is_integral<InType>{} && std::is_integral<OutType>{} && + (sizeof(InType) == sizeof(OutType))); + }; + + template <typename InType, typename OutType, + typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr> + Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) { + *out_ptr = reinterpret_cast<const OutType*>(in_ptr); + return Status::OK(); + } + + template <typename InType, typename OutType, + typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr> + Status ConvertPhysicalType( + const InType* in_ptr, int64_t length, const OutType** out_ptr) { + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType))); + OutType* mutable_out_ptr = reinterpret_cast<OutType*>(data_buffer_.mutable_data()); + std::copy(in_ptr, in_ptr + length, mutable_out_ptr); + *out_ptr = mutable_out_ptr; + return Status::OK(); + } + Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length); + Status WriteFlatColumnChunk(const StringArray* data, int64_t offset, int64_t length); Status Close(); virtual ~Impl() {} @@ -53,6 +85,8 @@ class FileWriter::Impl { friend class FileWriter; MemoryPool* pool_; + // Buffer used for storing the data of an array converted to the physical type + // as expected by parquet-cpp. PoolBuffer data_buffer_; PoolBuffer def_levels_buffer_; std::unique_ptr<::parquet::ParquetFileWriter> writer_; @@ -72,36 +106,95 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { return Status::OK(); } -template <typename ParquetType> +template <typename ParquetType, typename ArrowType> Status FileWriter::Impl::TypedWriteBatch(::parquet::ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset, int64_t length) { - // TODO: DCHECK((offset + length) <= data->length()); - auto data_ptr = - reinterpret_cast<const typename ParquetType::c_type*>(data->data()->data()) + - offset; + using ArrowCType = typename ArrowType::c_type; + using ParquetCType = typename ParquetType::c_type; + + DCHECK((offset + length) <= data->length()); + auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()) + offset; auto writer = reinterpret_cast<::parquet::TypedColumnWriter<ParquetType>*>(column_writer); if (writer->descr()->max_definition_level() == 0) { // no nulls, just dump the data - PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_ptr)); + const ParquetCType* data_writer_ptr; + RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>( + data_ptr, length, &data_writer_ptr))); + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_writer_ptr)); } else if (writer->descr()->max_definition_level() == 1) { RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); int16_t* def_levels_ptr = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); if (data->null_count() == 0) { std::fill(def_levels_ptr, def_levels_ptr + length, 1); - PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, data_ptr)); + const ParquetCType* data_writer_ptr; + RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>( + data_ptr, length, &data_writer_ptr))); + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr)); } else { - RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(typename ParquetType::c_type))); - auto buffer_ptr = - reinterpret_cast<typename ParquetType::c_type*>(data_buffer_.mutable_data()); + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType))); + auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data()); int buffer_idx = 0; for (int i = 0; i < length; i++) { if (data->IsNull(offset + i)) { def_levels_ptr[i] = 0; } else { def_levels_ptr[i] = 1; - buffer_ptr[buffer_idx++] = data_ptr[i]; + buffer_ptr[buffer_idx++] = static_cast<ParquetCType>(data_ptr[i]); + } + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + +// This specialization seems quite similar but it significantly differs in two points: +// * offset is added at the most latest time to the pointer as we have sub-byte access +// * Arrow data is stored bitwise thus we cannot use std::copy to transform from +// ArrowType::c_type to ParquetType::c_type +template <> +Status FileWriter::Impl::TypedWriteBatch<::parquet::BooleanType, BooleanType>( + ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset, + int64_t length) { + DCHECK((offset + length) <= data->length()); + RETURN_NOT_OK(data_buffer_.Resize(length)); + auto data_ptr = reinterpret_cast<const uint8_t*>(data->data()->data()); + auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data()); + auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::BooleanType>*>( + column_writer); + if (writer->descr()->max_definition_level() == 0) { + // no nulls, just dump the data + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = util::get_bit(data_ptr, offset + i); + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, buffer_ptr)); + } else if (writer->descr()->max_definition_level() == 1) { + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); + int16_t* def_levels_ptr = + reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); + if (data->null_count() == 0) { + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = util::get_bit(data_ptr, offset + i); + } + // TODO(PARQUET-644): write boolean values as a packed bitmap + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else { + int buffer_idx = 0; + for (int i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { + def_levels_ptr[i] = 0; + } else { + def_levels_ptr[i] = 1; + buffer_ptr[buffer_idx++] = util::get_bit(data_ptr, offset + i); } } PARQUET_CATCH_NOT_OK( @@ -120,9 +213,9 @@ Status FileWriter::Impl::Close() { return Status::OK(); } -#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ - case Type::ENUM: \ - return TypedWriteBatch<ParquetType>(writer, data, offset, length); \ +#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ + case Type::ENUM: \ + return TypedWriteBatch<ParquetType, ArrowType>(writer, data, offset, length); \ break; Status FileWriter::Impl::WriteFlatColumnChunk( @@ -130,15 +223,76 @@ Status FileWriter::Impl::WriteFlatColumnChunk( ::parquet::ColumnWriter* writer; PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn()); switch (data->type_enum()) { - TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) - TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type) - TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType) - TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType) + TYPED_BATCH_CASE(BOOL, BooleanType, ::parquet::BooleanType) + TYPED_BATCH_CASE(UINT8, UInt8Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT8, Int8Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(UINT16, UInt16Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(INT16, Int16Type, ::parquet::Int32Type) + case Type::UINT32: + if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) { + // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need + // to use the larger Int64Type to store them lossless. + return TypedWriteBatch<::parquet::Int64Type, UInt32Type>( + writer, data, offset, length); + } else { + return TypedWriteBatch<::parquet::Int32Type, UInt32Type>( + writer, data, offset, length); + } + TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type) + TYPED_BATCH_CASE(UINT64, UInt64Type, ::parquet::Int64Type) + TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type) + TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType) + TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType) default: return Status::NotImplemented(data->type()->ToString()); } } +Status FileWriter::Impl::WriteFlatColumnChunk( + const StringArray* data, int64_t offset, int64_t length) { + ::parquet::ColumnWriter* column_writer; + PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn()); + DCHECK((offset + length) <= data->length()); + RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(::parquet::ByteArray))); + auto buffer_ptr = reinterpret_cast<::parquet::ByteArray*>(data_buffer_.mutable_data()); + auto values = std::dynamic_pointer_cast<PrimitiveArray>(data->values()); + auto data_ptr = reinterpret_cast<const uint8_t*>(values->data()->data()); + DCHECK(values != nullptr); + auto writer = reinterpret_cast<::parquet::TypedColumnWriter<::parquet::ByteArrayType>*>( + column_writer); + if (writer->descr()->max_definition_level() > 0) { + RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); + } + int16_t* def_levels_ptr = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); + if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) { + // no nulls, just dump the data + for (int64_t i = 0; i < length; i++) { + buffer_ptr[i] = ::parquet::ByteArray( + data->value_length(i + offset), data_ptr + data->value_offset(i)); + } + if (writer->descr()->max_definition_level() > 0) { + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else if (writer->descr()->max_definition_level() == 1) { + int buffer_idx = 0; + for (int64_t i = 0; i < length; i++) { + if (data->IsNull(offset + i)) { + def_levels_ptr[i] = 0; + } else { + def_levels_ptr[i] = 1; + buffer_ptr[buffer_idx++] = ::parquet::ByteArray( + data->value_length(i + offset), data_ptr + data->value_offset(i + offset)); + } + } + PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + } else { + return Status::NotImplemented("no support for max definition level > 1 yet"); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + FileWriter::FileWriter( MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer) : impl_(new FileWriter::Impl(pool, std::move(writer))) {} @@ -148,10 +302,20 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) { } Status FileWriter::WriteFlatColumnChunk( - const PrimitiveArray* data, int64_t offset, int64_t length) { + const Array* array, int64_t offset, int64_t length) { int64_t real_length = length; - if (length == -1) { real_length = data->length(); } - return impl_->WriteFlatColumnChunk(data, offset, real_length); + if (length == -1) { real_length = array->length(); } + if (array->type_enum() == Type::STRING) { + auto string_array = dynamic_cast<const StringArray*>(array); + DCHECK(string_array); + return impl_->WriteFlatColumnChunk(string_array, offset, real_length); + } else { + auto primitive_array = dynamic_cast<const PrimitiveArray*>(array); + if (!primitive_array) { + return Status::NotImplemented("Table must consist of PrimitiveArray instances"); + } + return impl_->WriteFlatColumnChunk(primitive_array, offset, real_length); + } } Status FileWriter::Close() { @@ -165,40 +329,30 @@ MemoryPool* FileWriter::memory_pool() const { FileWriter::~FileWriter() {} Status WriteFlatTable(const Table* table, MemoryPool* pool, - std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size) { + const std::shared_ptr<::parquet::OutputStream>& sink, int64_t chunk_size, + const std::shared_ptr<::parquet::WriterProperties>& properties) { std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema; - RETURN_NOT_OK(ToParquetSchema(table->schema().get(), &parquet_schema)); + RETURN_NOT_OK( + ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema)); auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema()); std::unique_ptr<ParquetFileWriter> parquet_writer = - ParquetFileWriter::Open(sink, schema_node); + ParquetFileWriter::Open(sink, schema_node, properties); FileWriter writer(pool, std::move(parquet_writer)); - // TODO: Support writing chunked arrays. + // TODO(ARROW-232) Support writing chunked arrays. for (int i = 0; i < table->num_columns(); i++) { if (table->column(i)->data()->num_chunks() != 1) { return Status::NotImplemented("No support for writing chunked arrays yet."); } } - // Cast to PrimitiveArray instances as we work with them. - std::vector<std::shared_ptr<PrimitiveArray>> arrays(table->num_columns()); - for (int i = 0; i < table->num_columns(); i++) { - // num_chunks == 1 as per above loop - std::shared_ptr<Array> array = table->column(i)->data()->chunk(0); - auto primitive_array = std::dynamic_pointer_cast<PrimitiveArray>(array); - if (!primitive_array) { - PARQUET_IGNORE_NOT_OK(writer.Close()); - return Status::NotImplemented("Table must consist of PrimitiveArray instances"); - } - arrays[i] = primitive_array; - } - for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) { int64_t offset = chunk * chunk_size; int64_t size = std::min(chunk_size, table->num_rows() - offset); RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close())); for (int i = 0; i < table->num_columns(); i++) { - RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(arrays[i].get(), offset, size), + std::shared_ptr<Array> array = table->column(i)->data()->chunk(0); + RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(array.get(), offset, size), PARQUET_IGNORE_NOT_OK(writer.Close())); } } http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/parquet/writer.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h index 93693f5..cfd80d8 100644 --- a/cpp/src/arrow/parquet/writer.h +++ b/cpp/src/arrow/parquet/writer.h @@ -25,10 +25,12 @@ namespace arrow { +class Array; class MemoryPool; class PrimitiveArray; class RowBatch; class Status; +class StringArray; class Table; namespace parquet { @@ -43,8 +45,7 @@ class FileWriter { FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer); Status NewRowGroup(int64_t chunk_size); - Status WriteFlatColumnChunk( - const PrimitiveArray* data, int64_t offset = 0, int64_t length = -1); + Status WriteFlatColumnChunk(const Array* data, int64_t offset = 0, int64_t length = -1); Status Close(); virtual ~FileWriter(); @@ -62,7 +63,9 @@ class FileWriter { * The table shall only consist of nullable, non-repeated columns of primitive type. */ Status WriteFlatTable(const Table* table, MemoryPool* pool, - std::shared_ptr<::parquet::OutputStream> sink, int64_t chunk_size); + const std::shared_ptr<::parquet::OutputStream>& sink, int64_t chunk_size, + const std::shared_ptr<::parquet::WriterProperties>& properties = + ::parquet::default_writer_properties()); } // namespace parquet http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/test-util.h b/cpp/src/arrow/test-util.h index 2f81161..055dac7 100644 --- a/cpp/src/arrow/test-util.h +++ b/cpp/src/arrow/test-util.h @@ -50,6 +50,8 @@ if (!s.ok()) { FAIL() << s.ToString(); } \ } while (0) +#define ASSERT_OK_NO_THROW(expr) ASSERT_NO_THROW(ASSERT_OK(expr)) + #define EXPECT_OK(expr) \ do { \ Status s = (expr); \ http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/cpp/src/arrow/types/primitive.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/types/primitive.cc b/cpp/src/arrow/types/primitive.cc index 08fc847..f4b47f9 100644 --- a/cpp/src/arrow/types/primitive.cc +++ b/cpp/src/arrow/types/primitive.cc @@ -133,6 +133,11 @@ Status PrimitiveBuilder<BooleanType>::Append( RETURN_NOT_OK(Reserve(length)); for (int i = 0; i < length; ++i) { + // Skip reading from unitialised memory + // TODO: This actually is only to keep valgrind happy but may or may not + // have a performance impact. + if ((valid_bytes != nullptr) && !valid_bytes[i]) continue; + if (values[i] > 0) { util::set_bit(raw_data_, length_ + i); } else { http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/python/pyarrow/includes/parquet.pxd ---------------------------------------------------------------------- diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd index 0918344..a2f83ea 100644 --- a/python/pyarrow/includes/parquet.pxd +++ b/python/pyarrow/includes/parquet.pxd @@ -32,6 +32,10 @@ cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil: pass cdef extern from "parquet/api/schema.h" namespace "parquet" nogil: + enum ParquetVersion" parquet::ParquetVersion::type": + PARQUET_1_0" parquet::ParquetVersion::PARQUET_1_0" + PARQUET_2_0" parquet::ParquetVersion::PARQUET_2_0" + cdef cppclass SchemaDescriptor: shared_ptr[Node] schema() GroupNode* group() @@ -80,6 +84,11 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil: LocalFileOutputStream(const c_string& path) void Close() + cdef cppclass WriterProperties: + cppclass Builder: + Builder* version(ParquetVersion version) + shared_ptr[WriterProperties] build() + cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil: cdef cppclass FileReader: @@ -93,5 +102,7 @@ cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil: cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil: - cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, shared_ptr[OutputStream] sink, int64_t chunk_size) + cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool, + const shared_ptr[OutputStream]& sink, int64_t chunk_size, + const shared_ptr[WriterProperties]& properties) http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/python/pyarrow/parquet.pyx ---------------------------------------------------------------------- diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx index 3d5355e..0b2b208 100644 --- a/python/pyarrow/parquet.pyx +++ b/python/pyarrow/parquet.pyx @@ -24,6 +24,7 @@ cimport pyarrow.includes.pyarrow as pyarrow from pyarrow.includes.parquet cimport * from pyarrow.compat import tobytes +from pyarrow.error import ArrowException from pyarrow.error cimport check_cstatus from pyarrow.table cimport Table @@ -42,11 +43,13 @@ def read_table(filename, columns=None): # in Cython (due to missing rvalue support) reader = unique_ptr[FileReader](new FileReader(default_memory_pool(), ParquetFileReader.OpenFile(tobytes(filename)))) - check_cstatus(reader.get().ReadFlatTable(&ctable)) + with nogil: + check_cstatus(reader.get().ReadFlatTable(&ctable)) + table.init(ctable) return table -def write_table(table, filename, chunk_size=None): +def write_table(table, filename, chunk_size=None, version=None): """ Write a Table to Parquet format @@ -56,16 +59,29 @@ def write_table(table, filename, chunk_size=None): filename : string chunk_size : int The maximum number of rows in each Parquet RowGroup + version : {"1.0", "2.0"}, default "1.0" + The Parquet format version, defaults to 1.0 """ cdef Table table_ = table cdef CTable* ctable_ = table_.table cdef shared_ptr[OutputStream] sink + cdef WriterProperties.Builder properties_builder cdef int64_t chunk_size_ = 0 if chunk_size is None: chunk_size_ = min(ctable_.num_rows(), int(2**16)) else: chunk_size_ = chunk_size + if version is not None: + if version == "1.0": + properties_builder.version(PARQUET_1_0) + elif version == "2.0": + properties_builder.version(PARQUET_2_0) + else: + raise ArrowException("Unsupported Parquet format version") + sink.reset(new LocalFileOutputStream(tobytes(filename))) - check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, chunk_size_)) + with nogil: + check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink, + chunk_size_, properties_builder.build())) http://git-wip-us.apache.org/repos/asf/arrow/blob/2f52cf4e/python/pyarrow/tests/test_parquet.py ---------------------------------------------------------------------- diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index d92cf4c..de9cfbb 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -42,18 +42,55 @@ def test_single_pylist_column_roundtrip(tmpdir): data_read = col_read.data.chunk(0) assert data_written.equals(data_read) -def test_pandas_rountrip(tmpdir): +def test_pandas_parquet_2_0_rountrip(tmpdir): size = 10000 + np.random.seed(0) df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16), + 'uint32': np.arange(size, dtype=np.uint32), + 'uint64': np.arange(size, dtype=np.uint64), + 'int8': np.arange(size, dtype=np.int16), + 'int16': np.arange(size, dtype=np.int16), 'int32': np.arange(size, dtype=np.int32), 'int64': np.arange(size, dtype=np.int64), 'float32': np.arange(size, dtype=np.float32), - 'float64': np.arange(size, dtype=np.float64) + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0, + 'str': [str(x) for x in range(size)], + 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None] }) filename = tmpdir.join('pandas_rountrip.parquet') arrow_table = A.from_pandas_dataframe(df) - A.parquet.write_table(arrow_table, filename.strpath) + A.parquet.write_table(arrow_table, filename.strpath, version="2.0") table_read = pyarrow.parquet.read_table(filename.strpath) df_read = table_read.to_pandas() pdt.assert_frame_equal(df, df_read) +def test_pandas_parquet_1_0_rountrip(tmpdir): + size = 10000 + np.random.seed(0) + df = pd.DataFrame({ + 'uint8': np.arange(size, dtype=np.uint8), + 'uint16': np.arange(size, dtype=np.uint16), + 'uint32': np.arange(size, dtype=np.uint32), + 'uint64': np.arange(size, dtype=np.uint64), + 'int8': np.arange(size, dtype=np.int16), + 'int16': np.arange(size, dtype=np.int16), + 'int32': np.arange(size, dtype=np.int32), + 'int64': np.arange(size, dtype=np.int64), + 'float32': np.arange(size, dtype=np.float32), + 'float64': np.arange(size, dtype=np.float64), + 'bool': np.random.randn(size) > 0 + }) + filename = tmpdir.join('pandas_rountrip.parquet') + arrow_table = A.from_pandas_dataframe(df) + A.parquet.write_table(arrow_table, filename.strpath, version="1.0") + table_read = pyarrow.parquet.read_table(filename.strpath) + df_read = table_read.to_pandas() + + # We pass uint32_t as int64_t if we write Parquet version 1.0 + df['uint32'] = df['uint32'].values.astype(np.int64) + + pdt.assert_frame_equal(df, df_read) +