Repository: parquet-cpp Updated Branches: refs/heads/master b1c85caf9 -> ad56e7aea
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc index 993ff67..7556313 100644 --- a/src/parquet/arrow/writer.cc +++ b/src/parquet/arrow/writer.cc @@ -26,11 +26,18 @@ #include "parquet/arrow/schema.h" #include "arrow/api.h" +#include "arrow/type_traits.h" +using arrow::Array; using arrow::BinaryArray; +using arrow::BooleanArray; +using arrow::Int16Array; +using arrow::Int16Builder; +using arrow::Field; using arrow::MemoryPool; using arrow::PoolBuffer; using arrow::PrimitiveArray; +using arrow::ListArray; using arrow::Status; using arrow::Table; @@ -43,19 +50,215 @@ namespace arrow { namespace BitUtil = ::arrow::BitUtil; +class LevelBuilder : public ::arrow::ArrayVisitor { + public: + explicit LevelBuilder(MemoryPool* pool) + : def_levels_(pool, ::arrow::int16()), rep_levels_(pool, ::arrow::int16()) { + def_levels_buffer_ = std::make_shared<PoolBuffer>(pool); + } + +#define PRIMITIVE_VISIT(ArrowTypePrefix) \ + Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \ + valid_bitmaps_.push_back(array.null_bitmap_data()); \ + null_counts_.push_back(array.null_count()); \ + values_type_ = array.type_enum(); \ + values_array_ = &array; \ + return Status::OK(); \ + } + + PRIMITIVE_VISIT(Boolean) + PRIMITIVE_VISIT(Int8) + PRIMITIVE_VISIT(Int16) + PRIMITIVE_VISIT(Int32) + PRIMITIVE_VISIT(Int64) + PRIMITIVE_VISIT(UInt8) + PRIMITIVE_VISIT(UInt16) + PRIMITIVE_VISIT(UInt32) + PRIMITIVE_VISIT(UInt64) + PRIMITIVE_VISIT(HalfFloat) + PRIMITIVE_VISIT(Float) + PRIMITIVE_VISIT(Double) + PRIMITIVE_VISIT(String) + PRIMITIVE_VISIT(Binary) + PRIMITIVE_VISIT(Date) + PRIMITIVE_VISIT(Time) + PRIMITIVE_VISIT(Timestamp) + PRIMITIVE_VISIT(Interval) + + Status Visit(const ListArray& array) override { + valid_bitmaps_.push_back(array.null_bitmap_data()); + null_counts_.push_back(array.null_count()); + offsets_.push_back(array.raw_offsets()); + + min_offset_idx_ = array.raw_offsets()[min_offset_idx_]; + max_offset_idx_ = array.raw_offsets()[max_offset_idx_]; + + return array.values()->Accept(this); + } + +#define NOT_IMPLEMENTED_VIST(ArrowTypePrefix) \ + Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \ + return Status::NotImplemented( \ + "Level generation for ArrowTypePrefix not supported yet"); \ + }; + + NOT_IMPLEMENTED_VIST(Null) + NOT_IMPLEMENTED_VIST(Struct) + NOT_IMPLEMENTED_VIST(Union) + NOT_IMPLEMENTED_VIST(Decimal) + NOT_IMPLEMENTED_VIST(Dictionary) + + Status GenerateLevels(const Array* array, int64_t offset, int64_t length, + const std::shared_ptr<Field>& field, int64_t* values_offset, + ::arrow::Type::type* values_type, int64_t* num_values, int64_t* num_levels, + std::shared_ptr<Buffer>* def_levels, std::shared_ptr<Buffer>* rep_levels, + const Array** values_array) { + // Work downwards to extract bitmaps and offsets + min_offset_idx_ = offset; + max_offset_idx_ = offset + length; + RETURN_NOT_OK(array->Accept(this)); + *num_values = max_offset_idx_ - min_offset_idx_; + *values_offset = min_offset_idx_; + *values_type = values_type_; + *values_array = values_array_; + + // Walk downwards to extract nullability + std::shared_ptr<Field> current_field = field; + nullable_.push_back(current_field->nullable); + while (current_field->type->num_children() > 0) { + if (current_field->type->num_children() > 1) { + return Status::NotImplemented( + "Fields with more than one child are not supported."); + } else { + current_field = current_field->type->child(0); + } + nullable_.push_back(current_field->nullable); + } + + // Generate the levels. + if (nullable_.size() == 1) { + // We have a PrimitiveArray + *rep_levels = nullptr; + if (nullable_[0]) { + RETURN_NOT_OK(def_levels_buffer_->Resize(length * sizeof(int16_t))); + auto def_levels_ptr = + reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data()); + if (array->null_count() == 0) { + std::fill(def_levels_ptr, def_levels_ptr + length, 1); + } else { + const uint8_t* valid_bits = array->null_bitmap_data(); + INIT_BITSET(valid_bits, offset); + for (int i = 0; i < length; i++) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { + def_levels_ptr[i] = 1; + } else { + def_levels_ptr[i] = 0; + } + READ_NEXT_BITSET(valid_bits); + } + } + *def_levels = def_levels_buffer_; + } else { + *def_levels = nullptr; + } + *num_levels = length; + } else { + RETURN_NOT_OK(rep_levels_.Append(0)); + HandleListEntries(0, 0, offset, length); + + std::shared_ptr<Array> def_levels_array; + RETURN_NOT_OK(def_levels_.Finish(&def_levels_array)); + *def_levels = static_cast<PrimitiveArray*>(def_levels_array.get())->data(); + + std::shared_ptr<Array> rep_levels_array; + RETURN_NOT_OK(rep_levels_.Finish(&rep_levels_array)); + *rep_levels = static_cast<PrimitiveArray*>(rep_levels_array.get())->data(); + *num_levels = rep_levels_array->length(); + } + + return Status::OK(); + } + + Status HandleList(int16_t def_level, int16_t rep_level, int64_t index) { + if (nullable_[rep_level]) { + if (null_counts_[rep_level] == 0 || + BitUtil::GetBit(valid_bitmaps_[rep_level], index)) { + return HandleNonNullList(def_level + 1, rep_level, index); + } else { + return def_levels_.Append(def_level); + } + } else { + return HandleNonNullList(def_level, rep_level, index); + } + } + + Status HandleNonNullList(int16_t def_level, int16_t rep_level, int64_t index) { + int32_t inner_offset = offsets_[rep_level][index]; + int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset; + int64_t recursion_level = rep_level + 1; + if (inner_length == 0) { return def_levels_.Append(def_level); } + if (recursion_level < static_cast<int64_t>(offsets_.size())) { + return HandleListEntries(def_level + 1, rep_level + 1, inner_offset, inner_length); + } else { + // We have reached the leaf: primitive list, handle remaining nullables + for (int64_t i = 0; i < inner_length; i++) { + if (i > 0) { RETURN_NOT_OK(rep_levels_.Append(rep_level + 1)); } + if (nullable_[recursion_level] && + ((null_counts_[recursion_level] == 0) || + BitUtil::GetBit(valid_bitmaps_[recursion_level], inner_offset + i))) { + RETURN_NOT_OK(def_levels_.Append(def_level + 2)); + } else { + // This can be produced in two case: + // * elements are nullable and this one is null (i.e. max_def_level = def_level + // + 2) + // * elements are non-nullable (i.e. max_def_level = def_level + 1) + RETURN_NOT_OK(def_levels_.Append(def_level + 1)); + } + } + return Status::OK(); + } + } + + Status HandleListEntries( + int16_t def_level, int16_t rep_level, int64_t offset, int64_t length) { + for (int64_t i = 0; i < length; i++) { + if (i > 0) { RETURN_NOT_OK(rep_levels_.Append(rep_level)); } + RETURN_NOT_OK(HandleList(def_level, rep_level, offset + i)); + } + return Status::OK(); + } + + private: + Int16Builder def_levels_; + std::shared_ptr<PoolBuffer> def_levels_buffer_; + Int16Builder rep_levels_; + + std::vector<int64_t> null_counts_; + std::vector<const uint8_t*> valid_bitmaps_; + std::vector<const int32_t*> offsets_; + std::vector<bool> nullable_; + + int32_t min_offset_idx_; + int32_t max_offset_idx_; + ::arrow::Type::type values_type_; + const Array* values_array_; +}; + class FileWriter::Impl { public: Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer); Status NewRowGroup(int64_t chunk_size); template <typename ParquetType, typename ArrowType> - Status TypedWriteBatch( - ColumnWriter* writer, const PrimitiveArray* data, int64_t offset, int64_t length); + Status TypedWriteBatch(ColumnWriter* writer, const Array* data, int64_t offset, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels); template <typename ParquetType, typename ArrowType> - Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t length, - const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, - int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr); + Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values, + int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels, + const uint8_t* valid_bits, int64_t valid_bits_offset, + const typename ArrowType::c_type* data_ptr); // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary // buffer @@ -85,8 +288,7 @@ class FileWriter::Impl { return Status::OK(); } - Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length); - Status WriteFlatColumnChunk(const BinaryArray* data, int64_t offset, int64_t length); + Status WriteColumnChunk(const Array* data, int64_t offset, int64_t length); Status Close(); virtual ~Impl() {} @@ -98,7 +300,6 @@ class FileWriter::Impl { // Buffer used for storing the data of an array converted to the physical type // as expected by parquet-cpp. PoolBuffer data_buffer_; - PoolBuffer def_levels_buffer_; std::unique_ptr<ParquetFileWriter> writer_; RowGroupWriter* row_group_writer_; }; @@ -116,47 +317,28 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) { } template <typename ParquetType, typename ArrowType> -Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, - const PrimitiveArray* data, int64_t offset, int64_t length) { +Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, const Array* array, + int64_t offset, int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels) { using ArrowCType = typename ArrowType::c_type; using ParquetCType = typename ParquetType::c_type; - DCHECK((offset + length) <= data->length()); + DCHECK((offset + num_values) <= array->length()); + auto data = static_cast<const PrimitiveArray*>(array); auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()) + offset; auto writer = reinterpret_cast<TypedColumnWriter<ParquetType>*>(column_writer); - if (writer->descr()->max_definition_level() == 0) { + + if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) { // no nulls, just dump the data const ParquetCType* data_writer_ptr = nullptr; RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>( - data_ptr, length, &data_writer_ptr))); - PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_writer_ptr)); - } else if (writer->descr()->max_definition_level() == 1) { - RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); - int16_t* def_levels_ptr = - reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); - if (data->null_count() == 0) { - std::fill(def_levels_ptr, def_levels_ptr + length, 1); - const ParquetCType* data_writer_ptr = nullptr; - RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>( - data_ptr, length, &data_writer_ptr))); - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr)); - } else { - const uint8_t* valid_bits = data->null_bitmap_data(); - INIT_BITSET(valid_bits, offset); - for (int i = 0; i < length; i++) { - if (bitset & (1 << bit_offset)) { - def_levels_ptr[i] = 1; - } else { - def_levels_ptr[i] = 0; - } - READ_NEXT_BITSET(valid_bits); - } - RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>( - writer, length, def_levels_ptr, nullptr, valid_bits, offset, data_ptr))); - } + data_ptr, num_values, &data_writer_ptr))); + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, data_writer_ptr)); } else { - return Status::NotImplemented("no support for max definition level > 1 yet"); + const uint8_t* valid_bits = data->null_bitmap_data(); + RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer, num_values, + num_levels, def_levels, rep_levels, valid_bits, offset, data_ptr))); } PARQUET_CATCH_NOT_OK(writer->Close()); return Status::OK(); @@ -164,22 +346,22 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, template <typename ParquetType, typename ArrowType> Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, - int64_t length, const int16_t* def_levels, const int16_t* rep_levels, - const uint8_t* valid_bits, int64_t valid_bits_offset, + int64_t num_values, int64_t num_levels, const int16_t* def_levels, + const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr) { using ParquetCType = typename ParquetType::c_type; - RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType))); + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType))); auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data()); INIT_BITSET(valid_bits, valid_bits_offset); - for (int i = 0; i < length; i++) { - if (bitset & (1 << bit_offset)) { + for (int i = 0; i < num_values; i++) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { buffer_ptr[i] = static_cast<ParquetCType>(data_ptr[i]); } READ_NEXT_BITSET(valid_bits); } PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( - length, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr)); + num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr)); return Status::OK(); } @@ -187,16 +369,17 @@ Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writ #define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType) \ template <> \ Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>( \ - TypedColumnWriter<ParquetType> * writer, int64_t length, \ + TypedColumnWriter<ParquetType> * writer, int64_t num_values, int64_t num_levels, \ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \ int64_t valid_bits_offset, const CType* data_ptr) { \ PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced( \ - length, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \ + num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \ return Status::OK(); \ } NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t) NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t) +NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t) NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float) NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) @@ -206,99 +389,34 @@ NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double) // ArrowType::c_type to ParquetType::c_type template <> Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>( - ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset, - int64_t length) { - DCHECK((offset + length) <= data->length()); - RETURN_NOT_OK(data_buffer_.Resize(length)); + ColumnWriter* column_writer, const Array* array, int64_t offset, int64_t num_values, + int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { + DCHECK((offset + num_values) <= array->length()); + RETURN_NOT_OK(data_buffer_.Resize(num_values)); + auto data = static_cast<const BooleanArray*>(array); auto data_ptr = reinterpret_cast<const uint8_t*>(data->data()->data()); auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data()); auto writer = reinterpret_cast<TypedColumnWriter<BooleanType>*>(column_writer); - if (writer->descr()->max_definition_level() == 0) { - // no nulls, just dump the data - for (int64_t i = 0; i < length; i++) { - buffer_ptr[i] = BitUtil::GetBit(data_ptr, offset + i); - } - PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, buffer_ptr)); - } else if (writer->descr()->max_definition_level() == 1) { - RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); - int16_t* def_levels_ptr = - reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); - if (data->null_count() == 0) { - std::fill(def_levels_ptr, def_levels_ptr + length, 1); - for (int64_t i = 0; i < length; i++) { - buffer_ptr[i] = BitUtil::GetBit(data_ptr, offset + i); - } - // TODO(PARQUET-644): write boolean values as a packed bitmap - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); - } else { - int buffer_idx = 0; - for (int i = 0; i < length; i++) { - if (data->IsNull(offset + i)) { - def_levels_ptr[i] = 0; - } else { - def_levels_ptr[i] = 1; - buffer_ptr[buffer_idx++] = BitUtil::GetBit(data_ptr, offset + i); - } - } - PARQUET_CATCH_NOT_OK( - writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); + + int buffer_idx = 0; + for (int i = 0; i < num_values; i++) { + if (!data->IsNull(offset + i)) { + buffer_ptr[buffer_idx++] = BitUtil::GetBit(data_ptr, offset + i); } - } else { - return Status::NotImplemented("no support for max definition level > 1 yet"); } + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); PARQUET_CATCH_NOT_OK(writer->Close()); return Status::OK(); } -Status FileWriter::Impl::Close() { - if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } - PARQUET_CATCH_NOT_OK(writer_->Close()); - return Status::OK(); -} - -#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \ - case ::arrow::Type::ENUM: \ - return TypedWriteBatch<ParquetType, ArrowType>(writer, data, offset, length); \ - break; - -Status FileWriter::Impl::WriteFlatColumnChunk( - const PrimitiveArray* data, int64_t offset, int64_t length) { - ColumnWriter* writer; - PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn()); - switch (data->type_enum()) { - TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType) - TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type) - TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type) - TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type) - TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type) - case ::arrow::Type::UINT32: - if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) { - // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need - // to use the larger Int64Type to store them lossless. - return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>( - writer, data, offset, length); - } else { - return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>( - writer, data, offset, length); - } - TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type) - TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type) - TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type) - TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type) - TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType) - TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType) - default: - return Status::NotImplemented(data->type()->ToString()); - } -} - -Status FileWriter::Impl::WriteFlatColumnChunk( - const BinaryArray* data, int64_t offset, int64_t length) { - ColumnWriter* column_writer; - PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn()); - DCHECK((offset + length) <= data->length()); - RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ByteArray))); +template <> +Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>( + ColumnWriter* column_writer, const Array* array, int64_t offset, int64_t num_values, + int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) { + DCHECK((offset + num_values) <= array->length()); + RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ByteArray))); + auto data = static_cast<const BinaryArray*>(array); auto buffer_ptr = reinterpret_cast<ByteArray*>(data_buffer_.mutable_data()); // In the case of an array consisting of only empty strings or all null, // data->data() points already to a nullptr, thus data->data()->data() will @@ -309,39 +427,36 @@ Status FileWriter::Impl::WriteFlatColumnChunk( DCHECK(data_ptr != nullptr); } auto writer = reinterpret_cast<TypedColumnWriter<ByteArrayType>*>(column_writer); - if (writer->descr()->max_definition_level() > 0) { - RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t))); - } - int16_t* def_levels_ptr = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()); - if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) { + + if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) { // no nulls, just dump the data - for (int64_t i = 0; i < length; i++) { + for (int64_t i = 0; i < num_values; i++) { buffer_ptr[i] = ByteArray(data->value_length(i + offset), data_ptr + data->value_offset(i)); } - if (writer->descr()->max_definition_level() > 0) { - std::fill(def_levels_ptr, def_levels_ptr + length, 1); - } - PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); - } else if (writer->descr()->max_definition_level() == 1) { + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); + } else { int buffer_idx = 0; - for (int64_t i = 0; i < length; i++) { - if (data->IsNull(offset + i)) { - def_levels_ptr[i] = 0; - } else { - def_levels_ptr[i] = 1; + for (int64_t i = 0; i < num_values; i++) { + if (!data->IsNull(offset + i)) { buffer_ptr[buffer_idx++] = ByteArray( data->value_length(i + offset), data_ptr + data->value_offset(i + offset)); } } - PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr)); - } else { - return Status::NotImplemented("no support for max definition level > 1 yet"); + PARQUET_CATCH_NOT_OK( + writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr)); } PARQUET_CATCH_NOT_OK(writer->Close()); return Status::OK(); } +Status FileWriter::Impl::Close() { + if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); } + PARQUET_CATCH_NOT_OK(writer_->Close()); + return Status::OK(); +} + FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer) : impl_(new FileWriter::Impl(pool, std::move(writer))) {} @@ -349,22 +464,83 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) { return impl_->NewRowGroup(chunk_size); } -Status FileWriter::WriteFlatColumnChunk( +Status FileWriter::Impl::WriteColumnChunk( + const Array* data, int64_t offset, int64_t length) { + ColumnWriter* column_writer; + PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn()); + DCHECK((offset + length) <= data->length()); + + int current_column_idx = row_group_writer_->current_column(); + std::shared_ptr<::arrow::Schema> arrow_schema; + RETURN_NOT_OK( + FromParquetSchema(writer_->schema(), {current_column_idx - 1}, &arrow_schema)); + LevelBuilder level_builder(pool_); + std::shared_ptr<Buffer> def_levels_buffer; + std::shared_ptr<Buffer> rep_levels_buffer; + int64_t values_offset; + ::arrow::Type::type values_type; + int64_t num_levels; + int64_t num_values; + const Array* values_array; + RETURN_NOT_OK(level_builder.GenerateLevels(data, offset, length, arrow_schema->field(0), + &values_offset, &values_type, &num_values, &num_levels, &def_levels_buffer, + &rep_levels_buffer, &values_array)); + const int16_t* def_levels = nullptr; + if (def_levels_buffer) { + def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data()); + } + const int16_t* rep_levels = nullptr; + if (rep_levels_buffer) { + rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data()); + } + +#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType) \ + case ::arrow::Type::ArrowEnum: \ + return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(column_writer, values_array, \ + values_offset, num_values, num_levels, def_levels, rep_levels); \ + break; + + switch (values_type) { + case ::arrow::Type::UINT32: { + if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) { + // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need + // to use the larger Int64Type to store them lossless. + return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(column_writer, + values_array, values_offset, num_values, num_levels, def_levels, rep_levels); + } else { + return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(column_writer, + values_array, values_offset, num_values, num_levels, def_levels, rep_levels); + } + } + WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType) + WRITE_BATCH_CASE(INT8, Int8Type, Int32Type) + WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type) + WRITE_BATCH_CASE(INT16, Int16Type, Int32Type) + WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type) + WRITE_BATCH_CASE(INT32, Int32Type, Int32Type) + WRITE_BATCH_CASE(INT64, Int64Type, Int64Type) + WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type) + WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type) + WRITE_BATCH_CASE(FLOAT, FloatType, FloatType) + WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType) + WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType) + WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType) + default: + std::stringstream ss; + ss << "Data type not supported as list value: " << values_array->type()->ToString(); + return Status::NotImplemented(ss.str()); + } + + PARQUET_CATCH_NOT_OK(column_writer->Close()); + + return Status::OK(); +} + +Status FileWriter::WriteColumnChunk( const ::arrow::Array* array, int64_t offset, int64_t length) { int64_t real_length = length; if (length == -1) { real_length = array->length(); } - if (array->type_enum() == ::arrow::Type::STRING || - array->type_enum() == ::arrow::Type::BINARY) { - auto binary_array = static_cast<const ::arrow::BinaryArray*>(array); - DCHECK(binary_array); - return impl_->WriteFlatColumnChunk(binary_array, offset, real_length); - } else { - auto primitive_array = dynamic_cast<const PrimitiveArray*>(array); - if (!primitive_array) { - return Status::NotImplemented("Table must consist of PrimitiveArray instances"); - } - return impl_->WriteFlatColumnChunk(primitive_array, offset, real_length); - } + return impl_->WriteColumnChunk(array, offset, real_length); } Status FileWriter::Close() { @@ -377,7 +553,7 @@ MemoryPool* FileWriter::memory_pool() const { FileWriter::~FileWriter() {} -Status WriteFlatTable(const Table* table, MemoryPool* pool, +Status WriteTable(const Table* table, MemoryPool* pool, const std::shared_ptr<OutputStream>& sink, int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties) { std::shared_ptr<SchemaDescriptor> parquet_schema; @@ -400,8 +576,8 @@ Status WriteFlatTable(const Table* table, MemoryPool* pool, int64_t size = std::min(chunk_size, table->num_rows() - offset); RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close())); for (int i = 0; i < table->num_columns(); i++) { - std::shared_ptr<::arrow::Array> array = table->column(i)->data()->chunk(0); - RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(array.get(), offset, size), + std::shared_ptr<Array> array = table->column(i)->data()->chunk(0); + RETURN_NOT_OK_ELSE(writer.WriteColumnChunk(array.get(), offset, size), PARQUET_IGNORE_NOT_OK(writer.Close())); } } @@ -409,11 +585,11 @@ Status WriteFlatTable(const Table* table, MemoryPool* pool, return writer.Close(); } -Status WriteFlatTable(const Table* table, MemoryPool* pool, +Status WriteTable(const Table* table, MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties) { auto wrapper = std::make_shared<ArrowOutputStream>(sink); - return WriteFlatTable(table, pool, wrapper, chunk_size, properties); + return WriteTable(table, pool, wrapper, chunk_size, properties); } } // namespace arrow http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h index a82c2f6..4a39c99 100644 --- a/src/parquet/arrow/writer.h +++ b/src/parquet/arrow/writer.h @@ -50,7 +50,7 @@ class PARQUET_EXPORT FileWriter { FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer); ::arrow::Status NewRowGroup(int64_t chunk_size); - ::arrow::Status WriteFlatColumnChunk( + ::arrow::Status WriteColumnChunk( const ::arrow::Array* data, int64_t offset = 0, int64_t length = -1); ::arrow::Status Close(); @@ -64,16 +64,16 @@ class PARQUET_EXPORT FileWriter { }; /** - * Write a flat Table to Parquet. + * Write a Table to Parquet. * - * The table shall only consist of nullable, non-repeated columns of primitive type. + * The table shall only consist of columns of primitive type or of primitive lists. */ -::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table, +::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table* table, ::arrow::MemoryPool* pool, const std::shared_ptr<OutputStream>& sink, int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); -::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table, +::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table* table, ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties = default_writer_properties()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/column/column-reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc index d410b5f..6bf6651 100644 --- a/src/parquet/column/column-reader-test.cc +++ b/src/parquet/column/column-reader-test.cc @@ -46,14 +46,21 @@ template <typename T> static inline bool vector_equal_with_def_levels(const vector<T>& left, const vector<int16_t> def_levels, int16_t max_def_levels, const vector<T>& right) { size_t i_left = 0; - for (size_t i = 0; i < right.size(); ++i) { - if (def_levels[i] != max_def_levels) { continue; } - if (left[i_left] != right[i]) { - std::cerr << "index " << i << " left was " << left[i_left] << " right was " - << right[i] << std::endl; - return false; + size_t i_right = 0; + for (size_t i = 0; i < def_levels.size(); i++) { + if (def_levels[i] == max_def_levels) { + // Compare + if (left[i_left] != right[i_right]) { + std::cerr << "index " << i << " left was " << left[i_left] << " right was " + << right[i] << std::endl; + return false; + } + i_left++; + i_right++; + } else if (def_levels[i] == (max_def_levels - 1)) { + // Null entry on the lowest nested level + i_right++; } - i_left++; } return true; @@ -107,7 +114,10 @@ class TestPrimitiveReader : public ::testing::Test { vector<uint8_t> valid_bits(num_levels_, 255); int total_values_read = 0; int batch_actual = 0; - int null_count = -1; + int levels_actual = 0; + int64_t null_count = -1; + int64_t levels_read = 0; + int64_t values_read; Int32Reader* reader = static_cast<Int32Reader*>(reader_.get()); int32_t batch_size = 8; @@ -116,14 +126,17 @@ class TestPrimitiveReader : public ::testing::Test { // 1) batch_size < page_size (multiple ReadBatch from a single page) // 2) batch_size > page_size (BatchRead limits to a single page) do { - batch = reader->ReadBatchSpaced(batch_size, dresult.data() + batch_actual, - rresult.data() + batch_actual, vresult.data() + batch_actual, &null_count, - valid_bits.data() + batch_actual, 0); + batch = reader->ReadBatchSpaced(batch_size, dresult.data() + levels_actual, + rresult.data() + levels_actual, vresult.data() + batch_actual, + valid_bits.data() + batch_actual, 0, &levels_read, &values_read, &null_count); total_values_read += batch - null_count; batch_actual += batch; + levels_actual += levels_read; batch_size = std::max(batch_size * 2, 4096); - } while (batch > 0); + } while ((batch > 0) || (levels_read > 0)); + ASSERT_EQ(num_levels_, levels_actual); + ASSERT_EQ(num_values_, total_values_read); if (max_def_level_ > 0) { ASSERT_TRUE(vector_equal(def_levels_, dresult)); ASSERT_TRUE( @@ -132,11 +145,9 @@ class TestPrimitiveReader : public ::testing::Test { ASSERT_TRUE(vector_equal(values_, vresult)); } if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); } - ASSERT_EQ(num_levels_, batch_actual); - ASSERT_EQ(num_values_, total_values_read); // catch improper writes at EOS - batch_actual = reader->ReadBatchSpaced( - 5, nullptr, nullptr, nullptr, &null_count, valid_bits.data(), 0); + batch_actual = reader->ReadBatchSpaced(5, nullptr, nullptr, nullptr, + valid_bits.data(), 0, &levels_read, &values_read, &null_count); ASSERT_EQ(0, batch_actual); ASSERT_EQ(0, null_count); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/column/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h index 90c0761..7924e55 100644 --- a/src/parquet/column/reader.h +++ b/src/parquet/column/reader.h @@ -130,25 +130,42 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader { int64_t* values_read); /// Read a batch of repetition levels, definition levels, and values from the - /// column and leave spaces for null entries in the values buffer. + /// column and leave spaces for null entries on the lowest level in the values + /// buffer. /// /// In comparision to ReadBatch the length of repetition and definition levels - /// is the same as of the number of values read. + /// is the same as of the number of values read for max_definition_level == 1. + /// In the case of max_definition_level > 1, the repetition and definition + /// levels are larger than the values but the values include the null entries + /// with definition_level == (max_definition_level - 1). /// /// To fully exhaust a row group, you must read batches until the number of /// values read reaches the number of stored values according to the metadata. /// - /// @param valid_bits Memory allocated for a bitmap that indicates if + /// @param batch_size the number of levels to read + /// @param[out] def_levels The Parquet definition levels, output has + /// the length levels_read. + /// @param[out] rep_levels The Parquet repetition levels, output has + /// the length levels_read. + /// @param[out] values The values in the lowest nested level including + /// spacing for nulls on the lowest levels; output has the length + /// values_read. + /// @param[out] valid_bits Memory allocated for a bitmap that indicates if /// the row is null or on the maximum definition level. For performance /// reasons the underlying buffer should be able to store 1 bit more than /// required. If this requires an additional byte, this byte is only read /// but never written to. /// @param valid_bits_offset The offset in bits of the valid_bits where the - /// first relevant bit resides. - /// - /// @return actual number of levels read + /// first relevant bit resides. + /// @param[out] levels_read The number of repetition/definition levels that were read. + /// @param[out] values_read The number of values read, this includes all + /// non-null entries as well as all null-entries on the lowest level + /// (i.e. definition_level == max_definition_level - 1) + /// @param[out] null_count The number of nulls on the lowest levels. + /// (i.e. (values_read - null_count) is total number of non-null entries) int64_t ReadBatchSpaced(int batch_size, int16_t* def_levels, int16_t* rep_levels, - T* values, int* null_count, uint8_t* valid_bits, int64_t valid_bits_offset); + T* values, uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read, + int64_t* values_read, int64_t* null_count); // Skip reading levels // Returns the number of levels skipped @@ -244,8 +261,8 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_ } inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels, - int16_t max_definition_level, int* null_count, uint8_t* valid_bits, - int64_t valid_bits_offset) { + int16_t max_definition_level, int64_t* values_read, int64_t* null_count, + uint8_t* valid_bits, int64_t valid_bits_offset) { int byte_offset = valid_bits_offset / 8; int bit_offset = valid_bits_offset % 8; uint8_t bitset = valid_bits[byte_offset]; @@ -253,9 +270,11 @@ inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_ for (int i = 0; i < num_def_levels; ++i) { if (def_levels[i] == max_definition_level) { bitset |= (1 << bit_offset); - } else { + } else if (def_levels[i] == (max_definition_level - 1)) { bitset &= ~(1 << bit_offset); *null_count += 1; + } else { + continue; } bit_offset++; @@ -268,14 +287,18 @@ inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_ } } if (bit_offset != 0) { valid_bits[byte_offset] = bitset; } + *values_read = (bit_offset + byte_offset * 8 - valid_bits_offset); } template <typename DType> inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int batch_size, - int16_t* def_levels, int16_t* rep_levels, T* values, int* null_count_out, - uint8_t* valid_bits, int64_t valid_bits_offset) { + int16_t* def_levels, int16_t* rep_levels, T* values, uint8_t* valid_bits, + int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read, + int64_t* null_count_out) { // HasNext invokes ReadNewPage if (!HasNext()) { + *levels_read = 0; + *values_read = 0; *null_count_out = 0; return 0; } @@ -297,15 +320,28 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int batch_size, } } - // TODO: Move this into the DefinitionLevels reader - int null_count = 0; - int16_t max_definition_level = descr_->max_definition_level(); - DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level, - &null_count, valid_bits, valid_bits_offset); + int64_t null_count = 0; + if (descr_->schema_node()->is_required()) { + // Node is required so there are no null entries on the lowest nesting level. + int values_to_read = 0; + for (int64_t i = 0; i < num_def_levels; ++i) { + if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; } + } + total_values = ReadValues(values_to_read, values); + for (int64_t i = 0; i < total_values; i++) { + ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i); + } + *values_read = total_values; + } else { + int16_t max_definition_level = descr_->max_definition_level(); + DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level, + values_read, &null_count, valid_bits, valid_bits_offset); + total_values = ReadValuesSpaced( + *values_read, values, null_count, valid_bits, valid_bits_offset); + } + *levels_read = num_def_levels; *null_count_out = null_count; - total_values = ReadValuesSpaced( - num_def_levels, values, null_count, valid_bits, valid_bits_offset); } else { // Required field, read all values total_values = ReadValues(batch_size, values); @@ -313,9 +349,10 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int batch_size, ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i); } *null_count_out = 0; + *levels_read = total_values; } - num_decoded_values_ += total_values; + num_decoded_values_ += *levels_read; return total_values; } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/column/statistics.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc index df7a308..7d6aebb 100644 --- a/src/parquet/column/statistics.cc +++ b/src/parquet/column/statistics.cc @@ -124,13 +124,13 @@ void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values, int64_t length = num_null + num_not_null; int64_t i = 0; for (; i < length; i++) { - if (bitset & (1 << bit_offset)) { break; } + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { break; } READ_NEXT_BITSET(valid_bits); } T min = values[i]; T max = values[i]; for (; i < length; i++) { - if (bitset & (1 << bit_offset)) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { if (compare(values[i], min)) { min = values[i]; } else if (compare(max, values[i])) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/column/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index f06ac30..315c42f 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -361,18 +361,24 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values, template <typename DType> inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, - int64_t valid_bits_offset, const T* values) { + int64_t valid_bits_offset, const T* values, int64_t* num_spaced_written) { int64_t values_to_write = 0; + int64_t spaced_values_to_write = 0; // If the field is required and non-repeated, there are no definition levels if (descr_->max_definition_level() > 0) { + // Minimal definition level for which spaced values are written + int16_t min_spaced_def_level = descr_->max_definition_level(); + if (descr_->schema_node()->is_optional()) { min_spaced_def_level--; } for (int64_t i = 0; i < num_values; ++i) { if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; } + if (def_levels[i] >= min_spaced_def_level) { ++spaced_values_to_write; } } WriteDefinitionLevels(num_values, def_levels); } else { // Required field, write all values values_to_write = num_values; + spaced_values_to_write = num_values; } // Not present for non-repeated fields @@ -393,7 +399,12 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values throw ParquetException("More rows were written in the column chunk than expected"); } - WriteValuesSpaced(num_values, valid_bits, valid_bits_offset, values); + if (descr_->schema_node()->is_optional()) { + WriteValuesSpaced(spaced_values_to_write, valid_bits, valid_bits_offset, values); + } else { + WriteValues(values_to_write, values); + } + *num_spaced_written = spaced_values_to_write; if (page_statistics_ != nullptr) { page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, values_to_write, @@ -447,15 +458,20 @@ void TypedColumnWriter<DType>::WriteBatchSpaced(int64_t num_values, int64_t write_batch_size = properties_->write_batch_size(); int num_batches = num_values / write_batch_size; int64_t num_remaining = num_values % write_batch_size; + int64_t num_spaced_written = 0; + int64_t values_offset = 0; for (int round = 0; round < num_batches; round++) { int64_t offset = round * write_batch_size; WriteMiniBatchSpaced(write_batch_size, &def_levels[offset], &rep_levels[offset], - valid_bits, valid_bits_offset + offset, &values[offset]); + valid_bits, valid_bits_offset + values_offset, values + values_offset, + &num_spaced_written); + values_offset += num_spaced_written; } // Write the remaining values int64_t offset = num_batches * write_batch_size; WriteMiniBatchSpaced(num_remaining, &def_levels[offset], &rep_levels[offset], - valid_bits, valid_bits_offset + offset, &values[offset]); + valid_bits, valid_bits_offset + values_offset, values + values_offset, + &num_spaced_written); } template <typename DType> http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/column/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index 094c65b..6ab84b2 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -181,7 +181,7 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter { int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, - const T* values); + const T* values, int64_t* num_spaced_written); typedef Encoder<DType> EncoderType; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/encodings/dictionary-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h index d465300..7128500 100644 --- a/src/parquet/encodings/dictionary-encoding.h +++ b/src/parquet/encodings/dictionary-encoding.h @@ -243,7 +243,7 @@ class DictEncoder : public Encoder<DType> { int64_t valid_bits_offset) override { INIT_BITSET(valid_bits, valid_bits_offset); for (int32_t i = 0; i < num_values; i++) { - if (bitset & (1 << bit_offset)) { Put(src[i]); } + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { Put(src[i]); } READ_NEXT_BITSET(valid_bits); } } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/encodings/encoder.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h index 35a36d3..1c06574 100644 --- a/src/parquet/encodings/encoder.h +++ b/src/parquet/encodings/encoder.h @@ -52,7 +52,9 @@ class Encoder { INIT_BITSET(valid_bits, valid_bits_offset); T* data = reinterpret_cast<T*>(buffer.mutable_data()); for (int32_t i = 0; i < num_values; i++) { - if (bitset & (1 << bit_offset)) { data[num_valid_values++] = src[i]; } + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { + data[num_valid_values++] = src[i]; + } READ_NEXT_BITSET(valid_bits); } Put(data, num_valid_values); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index 0fa4e44..de7c4e4 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -581,6 +581,8 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl { return column_builder_ptr; } + int current_column() { return current_column_; } + void Finish(int64_t total_bytes_written) { if (!(current_column_ == schema_->num_columns())) { std::stringstream ss; @@ -635,6 +637,10 @@ ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() { return impl_->NextColumnChunk(); } +int RowGroupMetaDataBuilder::current_column() const { + return impl_->current_column(); +} + int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/metadata.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index 1f8b09f..eab7fc6 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -196,6 +196,7 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder { ColumnChunkMetaDataBuilder* NextColumnChunk(); int num_columns(); + int current_column() const; // commit the metadata void Finish(int64_t total_bytes_written); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index 877f668..8c1316b 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -174,6 +174,10 @@ ColumnWriter* RowGroupSerializer::NextColumn() { return current_column_writer_.get(); } +int RowGroupSerializer::current_column() const { + return metadata_->current_column(); +} + void RowGroupSerializer::Close() { if (!closed_) { closed_ = true; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index f803f92..0140c5b 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -82,6 +82,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents { int64_t num_rows() const override; ColumnWriter* NextColumn() override; + int current_column() const override; void Close() override; private: http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc index a381c22..8d7d208 100644 --- a/src/parquet/file/writer.cc +++ b/src/parquet/file/writer.cc @@ -41,6 +41,10 @@ ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); } +int RowGroupWriter::current_column() { + return contents_->current_column(); +} + // ---------------------------------------------------------------------- // ParquetFileWriter public API http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h index 07ccb51..ab32137 100644 --- a/src/parquet/file/writer.h +++ b/src/parquet/file/writer.h @@ -42,6 +42,7 @@ class PARQUET_EXPORT RowGroupWriter { virtual int64_t num_rows() const = 0; virtual ColumnWriter* NextColumn() = 0; + virtual int current_column() const = 0; virtual void Close() = 0; }; @@ -56,6 +57,8 @@ class PARQUET_EXPORT RowGroupWriter { * modified anymore. */ ColumnWriter* NextColumn(); + /// Index of currently written column + int current_column(); void Close(); int num_columns() const; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/util/bit-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/bit-util.h b/src/parquet/util/bit-util.h index ca094bc..d1e81a3 100644 --- a/src/parquet/util/bit-util.h +++ b/src/parquet/util/bit-util.h @@ -32,17 +32,17 @@ namespace parquet { -#define INIT_BITSET(valid_bits_vector, valid_bits_index) \ - int byte_offset = valid_bits_index / 8; \ - int bit_offset = valid_bits_index % 8; \ - uint8_t bitset = valid_bits_vector[byte_offset]; - -#define READ_NEXT_BITSET(valid_bits_vector) \ - bit_offset++; \ - if (bit_offset == 8) { \ - bit_offset = 0; \ - byte_offset++; \ - bitset = valid_bits_vector[byte_offset]; \ +#define INIT_BITSET(valid_bits_vector, valid_bits_index) \ + int byte_offset_##valid_bits_vector = valid_bits_index / 8; \ + int bit_offset_##valid_bits_vector = valid_bits_index % 8; \ + uint8_t bitset_##valid_bits_vector = valid_bits_vector[byte_offset_##valid_bits_vector]; + +#define READ_NEXT_BITSET(valid_bits_vector) \ + bit_offset_##valid_bits_vector++; \ + if (bit_offset_##valid_bits_vector == 8) { \ + bit_offset_##valid_bits_vector = 0; \ + byte_offset_##valid_bits_vector++; \ + bitset_##valid_bits_vector = valid_bits_vector[byte_offset_##valid_bits_vector]; \ } // TODO(wesm): The source from Impala was depending on boost::make_unsigned http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/util/rle-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/rle-encoding.h b/src/parquet/util/rle-encoding.h index 9b2bf56..aa888ad 100644 --- a/src/parquet/util/rle-encoding.h +++ b/src/parquet/util/rle-encoding.h @@ -352,7 +352,7 @@ inline int RleDecoder::GetBatchWithDictSpaced(const Vector<T>& dictionary, T* va INIT_BITSET(valid_bits, valid_bits_offset); while (values_read < batch_size) { - bool is_valid = (bitset & (1 << bit_offset)); + bool is_valid = (bitset_valid_bits & (1 << bit_offset_valid_bits)); READ_NEXT_BITSET(valid_bits); if (is_valid) { @@ -366,7 +366,7 @@ inline int RleDecoder::GetBatchWithDictSpaced(const Vector<T>& dictionary, T* va repeat_count_--; while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size) { - if (bitset & (1 << bit_offset)) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { repeat_count_--; } else { remaining_nulls--; @@ -394,7 +394,7 @@ inline int RleDecoder::GetBatchWithDictSpaced(const Vector<T>& dictionary, T* va // Read the first bitset to the end while (literals_read < literal_batch) { - if (bitset & (1 << bit_offset)) { + if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { values[values_read + literals_read + skipped] = dictionary[indices[literals_read]]; literals_read++;
