Repository: parquet-cpp Updated Branches: refs/heads/master 893af978a -> 84ffae5c6
PARQUET-965: Add FIXED_LEN_BYTE_ARRAY read and write support in parquet-arrow The decimal support is left for further commits. This may also helps ARROW-901 Author: Xianjin YE <[email protected]> Closes #315 from advancedxy/PARQUET-965 and squashes the following commits: 402b059 [Xianjin YE] Rewording some comments and minor fix to avoid realloc in Resize 30f3705 [Xianjin YE] Fix FLBA to fixed_size_binary schema convert test failure. e48a5f5 [Xianjin YE] Add support for FIXED_LEN_BYTE_ARRAY read and write support in parquet-arrow. Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/84ffae5c Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/84ffae5c Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/84ffae5c Branch: refs/heads/master Commit: 84ffae5c60fd5c5a31f62110712d6d38807b68da Parents: 893af97 Author: Xianjin YE <[email protected]> Authored: Tue May 2 09:26:41 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Tue May 2 09:26:41 2017 -0400 ---------------------------------------------------------------------- src/parquet/arrow/arrow-reader-writer-test.cc | 23 ++++++-- src/parquet/arrow/arrow-schema-test.cc | 3 +- src/parquet/arrow/reader.cc | 61 ++++++++++++++++++++++ src/parquet/arrow/reader.h | 2 +- src/parquet/arrow/schema.cc | 8 ++- src/parquet/arrow/test-util.h | 46 ++++++++++++++++ src/parquet/arrow/writer.cc | 34 ++++++++++++ 7 files changed, 171 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 1d87606..4c351b4 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -128,6 +128,8 @@ ParquetType::type get_physical_type(const ::arrow::DataType& type) { return ParquetType::BYTE_ARRAY; case ArrowId::STRING: return ParquetType::BYTE_ARRAY; + case ArrowId::FIXED_SIZE_BINARY: + return ParquetType::FIXED_LEN_BYTE_ARRAY; case ArrowId::DATE32: return ParquetType::INT32; case ArrowId::DATE64: @@ -265,9 +267,15 @@ struct test_traits<::arrow::BinaryType> { static std::string const value; }; +template <> +struct test_traits<::arrow::FixedSizeBinaryType> { + static constexpr ParquetType::type parquet_enum = ParquetType::FIXED_LEN_BYTE_ARRAY; + static std::string const value; +}; + const std::string test_traits<::arrow::StringType>::value("Test"); const std::string test_traits<::arrow::BinaryType>::value("\x00\x01\x02\x03"); - +const std::string test_traits<::arrow::FixedSizeBinaryType>::value("Fixed"); template <typename T> using ParquetDataType = DataType<test_traits<T>::parquet_enum>; @@ -306,8 +314,17 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads, static std::shared_ptr<GroupNode> MakeSimpleSchema( const ::arrow::DataType& type, Repetition::type repetition) { + int byte_width; + // Decimal is not implemented yet. + switch (type.id()) { + case ::arrow::Type::FIXED_SIZE_BINARY: + byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width(); + break; + default: + byte_width = -1; + } auto pnode = PrimitiveNode::Make( - "column1", repetition, get_physical_type(type), get_logical_type(type)); + "column1", repetition, get_physical_type(type), get_logical_type(type), byte_width); NodePtr node_ = GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode})); return std::static_pointer_cast<GroupNode>(node_); @@ -423,7 +440,7 @@ class TestParquetIO : public ::testing::Test { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::Date32Type, ::arrow::FloatType, ::arrow::DoubleType, - ::arrow::StringType, ::arrow::BinaryType> + ::arrow::StringType, ::arrow::BinaryType, ::arrow::FixedSizeBinaryType> TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/arrow-schema-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index a8a8c09..b56646e 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -151,7 +151,8 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) { parquet_fields.push_back(PrimitiveNode::Make("flba-binary", Repetition::OPTIONAL, ParquetType::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE, 12)); - arrow_fields.push_back(std::make_shared<Field>("flba-binary", BINARY)); + arrow_fields.push_back( + std::make_shared<Field>("flba-binary", ::arrow::fixed_size_binary(12))); auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields); ASSERT_OK(ConvertSchema(parquet_fields)); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 06e5e22..6d8c4ff 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -228,6 +228,9 @@ class ColumnReader::Impl { Status ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out); template <typename ArrowType> + Status ReadFLBABatch(int batch_size, int byte_width, std::shared_ptr<Array>* out); + + template <typename ArrowType> Status InitDataBuffer(int batch_size); Status InitValidBits(int batch_size); template <typename ArrowType, typename ParquetType> @@ -1019,6 +1022,58 @@ Status ColumnReader::Impl::ReadByteArrayBatch( return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out); } +template <typename ArrowType> +Status ColumnReader::Impl::ReadFLBABatch( + int batch_size, int byte_width, std::shared_ptr<Array>* out) { + using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType; + int total_levels_read = 0; + if (descr_->max_definition_level() > 0) { + RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false)); + } + if (descr_->max_repetition_level() > 0) { + RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false)); + } + int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); + int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data()); + + int values_to_read = batch_size; + BuilderType builder(pool_, ::arrow::fixed_size_binary(byte_width)); + while ((values_to_read > 0) && column_reader_) { + RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(FLBA), false)); + auto reader = dynamic_cast<TypedColumnReader<FLBAType>*>(column_reader_.get()); + int64_t values_read; + int64_t levels_read; + auto values = reinterpret_cast<FLBA*>(values_buffer_.mutable_data()); + PARQUET_CATCH_NOT_OK( + levels_read = reader->ReadBatch(values_to_read, def_levels + total_levels_read, + rep_levels + total_levels_read, values, &values_read)); + values_to_read -= levels_read; + if (descr_->max_definition_level() == 0) { + for (int64_t i = 0; i < levels_read; i++) { + RETURN_NOT_OK(builder.Append(values[i].ptr)); + } + } else { + int values_idx = 0; + int nullable_elements = descr_->schema_node()->is_optional(); + for (int64_t i = 0; i < levels_read; i++) { + if (nullable_elements && + (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) { + RETURN_NOT_OK(builder.AppendNull()); + } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) { + RETURN_NOT_OK(builder.Append(values[values_idx].ptr)); + values_idx++; + } + } + total_levels_read += levels_read; + } + if (!column_reader_->HasNext()) { NextRowGroup(); } + } + + RETURN_NOT_OK(builder.Finish(out)); + // Check if we should transform this array into an list array. + return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out); +} + template <> Status ColumnReader::Impl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>( int batch_size, std::shared_ptr<Array>* out) { @@ -1059,6 +1114,12 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType) TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type) TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type) + case ::arrow::Type::FIXED_SIZE_BINARY: { + int32_t byte_width = + static_cast<::arrow::FixedSizeBinaryType*>(field_->type().get())->byte_width(); + return ReadFLBABatch<::arrow::FixedSizeBinaryType>(batch_size, byte_width, out); + break; + } case ::arrow::Type::TIMESTAMP: { ::arrow::TimestampType* timestamp_type = static_cast<::arrow::TimestampType*>(field_->type().get()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h index f12acaf..24601b8 100644 --- a/src/parquet/arrow/reader.h +++ b/src/parquet/arrow/reader.h @@ -33,7 +33,7 @@ class MemoryPool; class RowBatch; class Status; class Table; -} +} // namespace arrow namespace parquet { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 4326161..6aeff17 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -73,7 +73,7 @@ static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) { static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) { switch (node->logical_type()) { case LogicalType::NONE: - *out = ::arrow::binary(); + *out = ::arrow::fixed_size_binary(node->type_length()); break; case LogicalType::DECIMAL: *out = MakeDecimalType(node); @@ -469,6 +469,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field, case ArrowType::BINARY: type = ParquetType::BYTE_ARRAY; break; + case ArrowType::FIXED_SIZE_BINARY: { + type = ParquetType::FIXED_LEN_BYTE_ARRAY; + auto fixed_size_binary_type = + static_cast<::arrow::FixedSizeBinaryType*>(field->type().get()); + length = fixed_size_binary_type->byte_width(); + } break; case ArrowType::DATE32: type = ParquetType::INT32; logical_type = LogicalType::DATE; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index 8bcd314..388250e 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -44,6 +44,9 @@ template <typename ArrowType> using is_arrow_binary = std::is_same<ArrowType, ::arrow::BinaryType>; template <typename ArrowType> +using is_arrow_fixed_size_binary = std::is_same<ArrowType, ::arrow::FixedSizeBinaryType>; + +template <typename ArrowType> using is_arrow_bool = std::is_same<ArrowType, ::arrow::BooleanType>; template <class ArrowType> @@ -98,6 +101,19 @@ NonNullArray(size_t size, std::shared_ptr<Array>* out) { return builder.Finish(out); } +template <typename ArrowType> +typename std::enable_if<is_arrow_fixed_size_binary<ArrowType>::value, Status>::type +NonNullArray(size_t size, std::shared_ptr<Array>* out) { + using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType; + // set byte_width to the length of "fixed": 5 + // todo: find a way to generate test data with more diversity. + BuilderType builder(::arrow::default_memory_pool(), ::arrow::fixed_size_binary(5)); + for (size_t i = 0; i < size; i++) { + builder.Append("fixed"); + } + return builder.Finish(out); +} + template <class ArrowType> typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NonNullArray( size_t size, std::shared_ptr<Array>* out) { @@ -201,6 +217,36 @@ NullableArray( return builder.Finish(out); } +// This helper function only supports (size/2) nulls yet, +// same as NullableArray<String|Binary>(..) +template <typename ArrowType> +typename std::enable_if<is_arrow_fixed_size_binary<ArrowType>::value, Status>::type +NullableArray( + size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<::arrow::Array>* out) { + std::vector<uint8_t> valid_bytes(size, 1); + + for (size_t i = 0; i < num_nulls; i++) { + valid_bytes[i * 2] = 0; + } + + using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType; + const int byte_width = 10; + BuilderType builder( + ::arrow::default_memory_pool(), ::arrow::fixed_size_binary(byte_width)); + + const int kBufferSize = byte_width; + uint8_t buffer[kBufferSize]; + for (size_t i = 0; i < size; i++) { + if (!valid_bytes[i]) { + builder.AppendNull(); + } else { + ::arrow::test::random_bytes(kBufferSize, seed + i, buffer); + builder.Append(buffer); + } + } + return builder.Finish(out); +} + // This helper function only supports (size/2) nulls yet. template <class ArrowType> typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableArray( http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84ffae5c/src/parquet/arrow/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 8b0a271..2ebeb4a 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -31,6 +31,7 @@ using arrow::Array; using arrow::BinaryArray; +using arrow::FixedSizeBinaryArray; using arrow::BooleanArray; using arrow::Int16Array; using arrow::Int16Builder; @@ -549,6 +550,38 @@ Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>( return Status::OK(); } +template <> +Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>( + ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels, + const int16_t* def_levels, const int16_t* rep_levels) { + RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false)); + auto data = static_cast<const FixedSizeBinaryArray*>(array.get()); + auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data()); + + auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer); + + if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) { + // no nulls, just dump the data + // todo(advancedxy): use a writeBatch to avoid this step + for (int64_t i = 0; i < data->length(); i++) { + buffer_ptr[i] = FixedLenByteArray(data->GetValue(i)); + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); + } else { + int buffer_idx = 0; + for (int64_t i = 0; i < data->length(); i++) { + if (!data->IsNull(i)) { + buffer_ptr[buffer_idx++] = FixedLenByteArray(data->GetValue(i)); + } + } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); + } + PARQUET_CATCH_NOT_OK(writer->Close()); + return Status::OK(); +} + Status FileWriter::Impl::Close() { if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } PARQUET_CATCH_NOT_OK(writer_->Close()); @@ -618,6 +651,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) { WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType) WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType) WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType) + WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType) WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type) WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type) WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)
