This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new fd0b90a ARROW-3769: [C++] Add support for reading non-dictionary encoded binary Parquet columns directly as DictionaryArray fd0b90a is described below commit fd0b90a7f7e65fde32af04c4746004a1240914cf Author: Hatem Helal <hhe...@mathworks.com> AuthorDate: Sun Mar 17 19:13:41 2019 -0500 ARROW-3769: [C++] Add support for reading non-dictionary encoded binary Parquet columns directly as DictionaryArray This patch addresses the following JIRAS: * [ARROW-3769](https://issues.apache.org/jira/browse/ARROW-3769): refactored record reader logic to toggle between the different builder depending on the column type (String or Binary) and the requested array type (Chunked "dense" or Dictionary). These changes are covered by unittests and benchmarks. * [PARQUET-1537](https://issues.apache.org/jira/browse/PARQUET-1537): fixed increment and covered by unittests. Also included is an experimental class `ArrowReaderProperties` that can be used to select which columns are read directly as an `arrow::DictionaryArray`. I think some more work is needed to fully address the requests in [ARROW-3772](https://issues.apache.org/jira/browse/ARROW-3772). Namely, the ability automatically infer which columns in a parquet file should be read as `DictionaryArray`. My current thinking is that this would be solved by introducing optional arrow type metadata [...] Note that the behavior with this patch is that incremental reading of a parquet file will not resolve the global dictionary for all of the row groups. There are a few possible solutions for this: * Introduce a concept of an "unknown" dictionary. This will enable concatenating multiple row groups together so long as we define unknown dictionaries as equal (assuming indices have the same data type) * Add an API for merging the schemas from multiple tables together. This could be used after reading multiple row groups to enable concatenating the tables together into one. * Add an API for inferring the global dictionary for the entire file. This could be an expensive operation so ideally would be made optional. * Allow a user-specified dictionary. This could be useful in the limited case where a caller already knows the global dictionary list (computed through some other mechanism). Author: Hatem Helal <hhe...@mathworks.com> Author: Hatem Helal <hatem.he...@gmail.com> Author: Hatem Helal <hatem.he...@mathworks.co.uk> Closes #3721 from hatemhelal/arrow-3769 and squashes the following commits: f644fff9c <Hatem Helal> Move schema fix logic to post-processing step 023c022c3 <Hatem Helal> Add virtual destructor to WrappedBuilderInterface 99e9dee12 <Hatem Helal> Removed dependencies on arrow builder in parquet/encoding 2026b513c <Hatem Helal> Rework ByteArrayDecoder interface to reduce code duplication 5bc933b97 <Hatem Helal> use PutSpaced in test setup to correctly initialize encoded data 2c8fa7efd <Hatem Helal> revert incorrect changes to PlainByteArrayDecoder::DecodeArrow method 7719b944f <Hatem Helal> Use random string generator instead of poor JSON e6ca0db43 <Hatem Helal> Fix DictEncoding test: need to use PutSpaced instead of Put in setup 9da133142 <Hatem Helal> Temporarily disable tests for arrow builder decoding from dictionary encoded col 7347cfa26 <Hatem Helal> Fix DecodeArrow from plain encoded columns 5fb9e860a <Hatem Helal> Rework parquet encoding tests 4d7bb30de <Hatem Helal> Refactor dictionary data generation into RandomArrayGenerator 6e65fdbdf <Hatem Helal> simplify ArrowReaderProperties and mark as experimental babe52e38 <Hatem Helal> replace deprecated ReadableFileInterface with RandomAccessFile a267a27d4 <Hatem Helal> remove unnecessary inlines 7aac84c45 <Hatem Helal> Reworked encoding benchmark to reduce code duplication 077a8f1ae <Hatem Helal> Move function definition to (hopefully) resolve appveyor build failure due to C2491 a35754456 <Hatem Helal> Basic unittests for reading DictionaryArray directly from parquet a6740f31e <Hatem Helal> Make sure to update the schema when reading a column as a DictionaryArray a8c15354e <Hatem Helal> Add support for requesting a parquet column be read as a DictionaryArray 28d76b7b2 <Hatem Helal> Add benchmark for dictionary decoding using arrow builder 8f59198e8 <Hatem Helal> Add overloads for decoding using a StringDictionaryBuilder b16eaa978 <Hatem Helal> prefer default_random_engine to avoid potential slowdown with Mersenne Twister prng ff380211c <Hatem Helal> prefer mersenne twister prng over default one which is implemenation defined 78eddb8af <Hatem Helal> Use value parameterization in decoding tests 84df23bfa <Hatem Helal> prefer range-based for loop to KeepRunning while loop pattern f234ca2a2 <Hatem Helal> respond to code review feedback - many readability fixes in benchmark and tests 4fbcf1fab <Hatem Helal> fix loop increment in templated PlainByteArrayDecoder::DecodeArrow method 39a5f1994 <Hatem Helal> fix appveyor windows failure 89de5d5be <Hatem Helal> rework data generation so that decoding benchmark runs using a more realistic dataset ef55081a0 <Hatem Helal> added benchmarks for decoding plain encoded data using arrow builders 31667ffbb <Hatem Helal> added tests for DictByteArrayDecoder and reworked previous tests 4a26f7405 <Hatem Helal> remove todo message fa504158f <Hatem Helal> Implement DecodeArrowNonNull and unit tests 21fc45083 <Hatem Helal> Add some basic unittests that exercise the DecodeArrow methods --- cpp/src/arrow/testing/random.cc | 63 ++++- cpp/src/arrow/testing/random.h | 29 +++ cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 75 ++++++ cpp/src/parquet/arrow/reader.cc | 101 +++++++-- cpp/src/parquet/arrow/reader.h | 48 +++- cpp/src/parquet/arrow/record_reader.cc | 260 ++++++++++++--------- cpp/src/parquet/arrow/record_reader.h | 7 +- cpp/src/parquet/encoding-benchmark.cc | 224 +++++++++++++++++- cpp/src/parquet/encoding-test.cc | 265 ++++++++++++++++++++++ cpp/src/parquet/encoding.cc | 102 ++------- cpp/src/parquet/encoding.h | 78 +++++-- 11 files changed, 1004 insertions(+), 248 deletions(-) diff --git a/cpp/src/arrow/testing/random.cc b/cpp/src/arrow/testing/random.cc index b050c5d..f693a45 100644 --- a/cpp/src/arrow/testing/random.cc +++ b/cpp/src/arrow/testing/random.cc @@ -79,7 +79,7 @@ std::shared_ptr<Array> RandomArrayGenerator::Boolean(int64_t size, double probab // only calls the GenerateBitmap method. using GenOpt = GenerateOptions<int, std::uniform_int_distribution<int>>; - std::vector<std::shared_ptr<Buffer>> buffers{2}; + BufferVector buffers{2}; // Need 2 distinct generators such that probabilities are not shared. GenOpt value_gen(seed(), 0, 1, probability); GenOpt null_gen(seed(), 0, 1, null_probability); @@ -100,7 +100,7 @@ static std::shared_ptr<NumericArray<ArrowType>> GenerateNumericArray(int64_t siz OptionType options) { using CType = typename ArrowType::c_type; auto type = TypeTraits<ArrowType>::type_singleton(); - std::vector<std::shared_ptr<Buffer>> buffers{2}; + BufferVector buffers{2}; int64_t null_count = 0; ABORT_NOT_OK(AllocateEmptyBitmap(size, &buffers[0])); @@ -145,5 +145,64 @@ PRIMITIVE_RAND_FLOAT_IMPL(Float64, double, DoubleType) #undef PRIMITIVE_RAND_FLOAT_IMPL #undef PRIMITIVE_RAND_IMPL +std::shared_ptr<arrow::Array> RandomArrayGenerator::String(int64_t size, + int32_t min_length, + int32_t max_length, + double null_probability) { + if (null_probability < 0 || null_probability > 1) { + ABORT_NOT_OK(Status::Invalid("null_probability must be between 0 and 1")); + } + + auto int32_lengths = Int32(size, min_length, max_length, null_probability); + auto lengths = std::dynamic_pointer_cast<Int32Array>(int32_lengths); + + // Visual Studio does not implement uniform_int_distribution for char types. + using GenOpt = GenerateOptions<uint8_t, std::uniform_int_distribution<uint16_t>>; + GenOpt options(seed(), static_cast<uint8_t>('A'), static_cast<uint8_t>('z'), + /*null_probability=*/0); + + std::vector<uint8_t> str_buffer(max_length); + StringBuilder builder; + + for (int64_t i = 0; i < size; ++i) { + if (lengths->IsValid(i)) { + options.GenerateData(str_buffer.data(), lengths->Value(i)); + ABORT_NOT_OK(builder.Append(str_buffer.data(), lengths->Value(i))); + } else { + ABORT_NOT_OK(builder.AppendNull()); + } + } + + std::shared_ptr<arrow::Array> result; + ABORT_NOT_OK(builder.Finish(&result)); + return result; +} + +std::shared_ptr<arrow::Array> RandomArrayGenerator::StringWithRepeats( + int64_t size, int64_t unique, int32_t min_length, int32_t max_length, + double null_probability) { + // Generate a random string dictionary without any nulls + auto array = String(unique, min_length, max_length, /*null_probability=*/0); + auto dictionary = std::dynamic_pointer_cast<StringArray>(array); + + // Generate random indices to sample the dictionary with + auto id_array = Int64(size, 0, unique - 1, null_probability); + auto indices = std::dynamic_pointer_cast<Int64Array>(id_array); + StringBuilder builder; + + for (int64_t i = 0; i < size; ++i) { + if (indices->IsValid(i)) { + const auto index = indices->Value(i); + const auto value = dictionary->GetView(index); + ABORT_NOT_OK(builder.Append(value)); + } else { + ABORT_NOT_OK(builder.AppendNull()); + } + } + + std::shared_ptr<Array> result; + ABORT_NOT_OK(builder.Finish(&result)); + return result; +} } // namespace random } // namespace arrow diff --git a/cpp/src/arrow/testing/random.h b/cpp/src/arrow/testing/random.h index 1ed8d03..f69b705 100644 --- a/cpp/src/arrow/testing/random.h +++ b/cpp/src/arrow/testing/random.h @@ -198,6 +198,35 @@ class ARROW_EXPORT RandomArrayGenerator { } } + /// \brief Generates a random StringArray + /// + /// \param[in] size the size of the array to generate + /// \param[in] min_length the lower bound of the string length + /// determined by the uniform distribution + /// \param[in] max_length the upper bound of the string length + /// determined by the uniform distribution + /// \param[in] null_probability the probability of a row being null + /// + /// \return a generated Array + std::shared_ptr<arrow::Array> String(int64_t size, int32_t min_length, + int32_t max_length, double null_probability); + + /// \brief Generates a random StringArray with repeated values + /// + /// \param[in] size the size of the array to generate + /// \param[in] unique the number of unique string values used + /// to populate the array + /// \param[in] min_length the lower bound of the string length + /// determined by the uniform distribution + /// \param[in] max_length the upper bound of the string length + /// determined by the uniform distribution + /// \param[in] null_probability the probability of a row being null + /// + /// \return a generated Array + std::shared_ptr<arrow::Array> StringWithRepeats(int64_t size, int64_t unique, + int32_t min_length, int32_t max_length, + double null_probability); + private: SeedType seed() { return seed_distribution_(seed_rng_); } diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc index 3995d2c..b54c3f4 100644 --- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc @@ -30,6 +30,7 @@ #include <vector> #include "arrow/api.h" +#include "arrow/testing/random.h" #include "arrow/testing/util.h" #include "arrow/type_traits.h" #include "arrow/util/decimal.h" @@ -2439,6 +2440,80 @@ TEST(TestArrowWriterAdHoc, SchemaMismatch) { ASSERT_RAISES(Invalid, writer->WriteTable(*tbl, 1)); } +// ---------------------------------------------------------------------- +// Tests for directly reading DictionaryArray +class TestArrowReadDictionary : public ::testing::TestWithParam<double> { + public: + void SetUp() override { + GenerateData(GetParam()); + ASSERT_NO_FATAL_FAILURE( + WriteTableToBuffer(expected_dense_, expected_dense_->num_rows() / 2, + default_arrow_writer_properties(), &buffer_)); + + properties_ = default_arrow_reader_properties(); + } + + void GenerateData(double null_probability) { + constexpr int num_unique = 100; + constexpr int repeat = 10; + constexpr int64_t min_length = 2; + constexpr int64_t max_length = 10; + ::arrow::random::RandomArrayGenerator rag(0); + auto dense_array = rag.StringWithRepeats(repeat * num_unique, num_unique, min_length, + max_length, null_probability); + expected_dense_ = MakeSimpleTable(dense_array, /*nullable=*/true); + + ::arrow::StringDictionaryBuilder builder(default_memory_pool()); + const auto& string_array = static_cast<const ::arrow::StringArray&>(*dense_array); + ASSERT_OK(builder.AppendArray(string_array)); + + std::shared_ptr<::arrow::Array> dict_array; + ASSERT_OK(builder.Finish(&dict_array)); + expected_dict_ = MakeSimpleTable(dict_array, /*nullable=*/true); + + // TODO(hatemhelal): Figure out if we can use the following to init the expected_dict_ + // Currently fails due to DataType mismatch for indices array. + // Datum out; + // FunctionContext ctx(default_memory_pool()); + // ASSERT_OK(DictionaryEncode(&ctx, Datum(dense_array), &out)); + // expected_dict_ = MakeSimpleTable(out.make_array(), /*nullable=*/true); + } + + void TearDown() override {} + + void CheckReadWholeFile(const Table& expected) { + std::unique_ptr<FileReader> reader; + ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer_), + ::arrow::default_memory_pool(), properties_, &reader)); + + std::shared_ptr<Table> actual; + ASSERT_OK_NO_THROW(reader->ReadTable(&actual)); + ::arrow::AssertTablesEqual(*actual, expected, /*same_chunk_layout=*/false); + } + + static std::vector<double> null_probabilites() { return {0.0, 0.5, 1}; } + + protected: + std::shared_ptr<Table> expected_dense_; + std::shared_ptr<Table> expected_dict_; + std::shared_ptr<Buffer> buffer_; + ArrowReaderProperties properties_; +}; + +TEST_P(TestArrowReadDictionary, ReadWholeFileDict) { + properties_.set_read_dictionary(0, true); + CheckReadWholeFile(*expected_dict_); +} + +TEST_P(TestArrowReadDictionary, ReadWholeFileDense) { + properties_.set_read_dictionary(0, false); + CheckReadWholeFile(*expected_dense_); +} + +INSTANTIATE_TEST_CASE_P( + ReadDictionary, TestArrowReadDictionary, + ::testing::ValuesIn(TestArrowReadDictionary::null_probabilites())); + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 61f5bb2..f891682 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -101,6 +101,11 @@ Status GetSingleChunk(const ChunkedArray& chunked, std::shared_ptr<Array>* out) } // namespace +ArrowReaderProperties default_arrow_reader_properties() { + static ArrowReaderProperties default_reader_props; + return default_reader_props; +} + // ---------------------------------------------------------------------- // Iteration utilities @@ -236,8 +241,9 @@ using FileColumnIteratorFactory = class FileReader::Impl { public: - Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader) - : pool_(pool), reader_(std::move(reader)), use_threads_(false) {} + Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader, + const ArrowReaderProperties& properties) + : pool_(pool), reader_(std::move(reader)), reader_properties_(properties) {} virtual ~Impl() {} @@ -279,14 +285,21 @@ class FileReader::Impl { int num_columns() const { return reader_->metadata()->num_columns(); } - void set_use_threads(bool use_threads) { use_threads_ = use_threads; } + void set_use_threads(bool use_threads) { + reader_properties_.set_use_threads(use_threads); + } ParquetFileReader* reader() { return reader_.get(); } + std::vector<int> GetDictionaryIndices(const std::vector<int>& indices); + std::shared_ptr<::arrow::Schema> FixSchema( + const ::arrow::Schema& old_schema, const std::vector<int>& dict_indices, + std::vector<std::shared_ptr<::arrow::Column>>& columns); + private: MemoryPool* pool_; std::unique_ptr<ParquetFileReader> reader_; - bool use_threads_; + ArrowReaderProperties reader_properties_; }; class ColumnReader::ColumnReaderImpl { @@ -302,9 +315,10 @@ class ColumnReader::ColumnReaderImpl { // Reader implementation for primitive arrays class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl { public: - PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input) + PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input, + const bool read_dictionary) : pool_(pool), input_(std::move(input)), descr_(input_->descr()) { - record_reader_ = RecordReader::Make(descr_, pool_); + record_reader_ = RecordReader::Make(descr_, pool_, read_dictionary); Status s = NodeToField(*input_->descr()->schema_node(), &field_); DCHECK_OK(s); NextRowGroup(); @@ -358,17 +372,19 @@ class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl { const std::vector<std::shared_ptr<ColumnReaderImpl>>& children); }; -FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader) - : impl_(new FileReader::Impl(pool, std::move(reader))) {} +FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader, + const ArrowReaderProperties& properties) + : impl_(new FileReader::Impl(pool, std::move(reader), properties)) {} FileReader::~FileReader() {} Status FileReader::Impl::GetColumn(int i, FileColumnIteratorFactory iterator_factory, std::unique_ptr<ColumnReader>* out) { std::unique_ptr<FileColumnIterator> input(iterator_factory(i, reader_.get())); + bool read_dict = reader_properties_.read_dictionary(i); std::unique_ptr<ColumnReader::ColumnReaderImpl> impl( - new PrimitiveImpl(pool_, std::move(input))); + new PrimitiveImpl(pool_, std::move(input), read_dict)); *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl))); return Status::OK(); } @@ -552,7 +568,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index, return Status::OK(); }; - if (use_threads_) { + if (reader_properties_.use_threads()) { std::vector<std::future<Status>> futures; auto pool = ::arrow::internal::GetCpuThreadPool(); for (int i = 0; i < num_fields; i++) { @@ -572,7 +588,15 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index, } } - *out = Table::Make(schema, columns); + auto dict_indices = GetDictionaryIndices(indices); + + if (!dict_indices.empty()) { + schema = FixSchema(*schema, dict_indices, columns); + } + + std::shared_ptr<Table> table = Table::Make(schema, columns); + RETURN_NOT_OK(table->Validate()); + *out = table; return Status::OK(); } @@ -599,7 +623,7 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& indices, return Status::OK(); }; - if (use_threads_) { + if (reader_properties_.use_threads()) { std::vector<std::future<Status>> futures; auto pool = ::arrow::internal::GetCpuThreadPool(); for (int i = 0; i < num_fields; i++) { @@ -619,6 +643,12 @@ Status FileReader::Impl::ReadTable(const std::vector<int>& indices, } } + auto dict_indices = GetDictionaryIndices(indices); + + if (!dict_indices.empty()) { + schema = FixSchema(*schema, dict_indices, columns); + } + std::shared_ptr<Table> table = Table::Make(schema, columns); RETURN_NOT_OK(table->Validate()); *out = table; @@ -664,6 +694,32 @@ Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) { return ReadRowGroup(i, indices, table); } +std::vector<int> FileReader::Impl::GetDictionaryIndices(const std::vector<int>& indices) { + // Select the column indices that were read as DictionaryArray + std::vector<int> dict_indices(indices); + auto remove_func = [this](int i) { return !reader_properties_.read_dictionary(i); }; + auto it = std::remove_if(dict_indices.begin(), dict_indices.end(), remove_func); + dict_indices.erase(it, dict_indices.end()); + return dict_indices; +} + +std::shared_ptr<::arrow::Schema> FileReader::Impl::FixSchema( + const ::arrow::Schema& old_schema, const std::vector<int>& dict_indices, + std::vector<std::shared_ptr<::arrow::Column>>& columns) { + // Fix the schema with the actual DictionaryType that was read + auto fields = old_schema.fields(); + + for (int idx : dict_indices) { + auto name = columns[idx]->name(); + auto dict_array = columns[idx]->data(); + auto dict_field = std::make_shared<::arrow::Field>(name, dict_array->type()); + fields[idx] = dict_field; + columns[idx] = std::make_shared<Column>(dict_field, dict_array); + } + + return std::make_shared<::arrow::Schema>(fields, old_schema.metadata()); +} + // Static ctor Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, MemoryPool* allocator, const ReaderProperties& props, @@ -683,6 +739,18 @@ Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, reader); } +Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + ::arrow::MemoryPool* allocator, const ArrowReaderProperties& properties, + std::unique_ptr<FileReader>* reader) { + std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(file)); + std::unique_ptr<ParquetReader> pq_reader; + PARQUET_CATCH_NOT_OK( + pq_reader = ParquetReader::Open(std::move(io_wrapper), + ::parquet::default_reader_properties(), nullptr)); + reader->reset(new FileReader(allocator, std::move(pq_reader), properties)); + return Status::OK(); +} + Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) { FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* reader) { return new AllRowGroupsIterator(i, reader); @@ -1138,15 +1206,6 @@ struct TransferFunctor< Status operator()(RecordReader* reader, MemoryPool* pool, const std::shared_ptr<::arrow::DataType>& type, Datum* out) { std::vector<std::shared_ptr<Array>> chunks = reader->GetBuilderChunks(); - - if (type->id() == ::arrow::Type::STRING) { - // Convert from BINARY type to STRING - for (size_t i = 0; i < chunks.size(); ++i) { - auto new_data = chunks[i]->data()->Copy(); - new_data->type = type; - chunks[i] = ::arrow::MakeArray(new_data); - } - } *out = std::make_shared<ChunkedArray>(chunks); return Status::OK(); } diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index 7ef21fd..52fcec8 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -20,6 +20,7 @@ #include <cstdint> #include <memory> +#include <unordered_set> #include <vector> #include "parquet/util/visibility.h" @@ -51,6 +52,42 @@ class ColumnChunkReader; class ColumnReader; class RowGroupReader; +static constexpr bool DEFAULT_USE_THREADS = false; + +/// EXPERIMENTAL: Properties for configuring FileReader behavior. +class PARQUET_EXPORT ArrowReaderProperties { + public: + explicit ArrowReaderProperties(bool use_threads = DEFAULT_USE_THREADS) + : use_threads_(use_threads), read_dict_indices_() {} + + void set_use_threads(bool use_threads) { use_threads_ = use_threads; } + + bool use_threads() const { return use_threads_; } + + void set_read_dictionary(int column_index, bool read_dict) { + if (read_dict) { + read_dict_indices_.insert(column_index); + } else { + read_dict_indices_.erase(column_index); + } + } + bool read_dictionary(int column_index) const { + if (read_dict_indices_.find(column_index) != read_dict_indices_.end()) { + return true; + } else { + return false; + } + } + + private: + bool use_threads_; + std::unordered_set<int> read_dict_indices_; +}; + +/// EXPERIMENTAL: Constructs the default ArrowReaderProperties +PARQUET_EXPORT +ArrowReaderProperties default_arrow_reader_properties(); + // Arrow read adapter class for deserializing Parquet files as Arrow row // batches. // @@ -109,7 +146,8 @@ class RowGroupReader; // arrays class PARQUET_EXPORT FileReader { public: - FileReader(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader); + FileReader(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader, + const ArrowReaderProperties& properties = default_arrow_reader_properties()); // Since the distribution of columns amongst a Parquet file's row groups may // be uneven (the number of values in each column chunk can be different), we @@ -291,7 +329,7 @@ class PARQUET_EXPORT ColumnReader { }; // Helper function to create a file reader from an implementation of an Arrow -// readable file +// random access file // // metadata : separately-computed file metadata, can be nullptr PARQUET_EXPORT @@ -306,6 +344,12 @@ PARQUET_EXPORT ::arrow::MemoryPool* allocator, std::unique_ptr<FileReader>* reader); +PARQUET_EXPORT +::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file, + ::arrow::MemoryPool* allocator, + const ArrowReaderProperties& properties, + std::unique_ptr<FileReader>* reader); + } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc index c800f36..42334b9 100644 --- a/cpp/src/parquet/arrow/record_reader.cc +++ b/cpp/src/parquet/arrow/record_reader.cc @@ -449,35 +449,16 @@ class RecordReader::RecordReaderImpl { }; template <typename DType> -struct RecordReaderTraits { - using BuilderType = ::arrow::ArrayBuilder; -}; - -template <> -struct RecordReaderTraits<ByteArrayType> { - using BuilderType = ::arrow::internal::ChunkedBinaryBuilder; -}; - -template <> -struct RecordReaderTraits<FLBAType> { - using BuilderType = ::arrow::FixedSizeBinaryBuilder; -}; - -template <typename DType> class TypedRecordReader : public RecordReader::RecordReaderImpl { public: using T = typename DType::c_type; - using BuilderType = typename RecordReaderTraits<DType>::BuilderType; - TypedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) - : RecordReader::RecordReaderImpl(descr, pool), current_decoder_(nullptr) { - InitializeBuilder(); - } + : RecordReader::RecordReaderImpl(descr, pool), current_decoder_(nullptr) {} void ResetDecoders() override { decoders_.clear(); } - inline void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { + virtual void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) { uint8_t* valid_bits = valid_bits_->mutable_data(); const int64_t valid_bits_offset = values_written_; @@ -487,7 +468,7 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl { DCHECK_EQ(num_decoded, values_with_nulls); } - inline void ReadValuesDense(int64_t values_to_read) { + virtual void ReadValuesDense(int64_t values_to_read) { int64_t num_decoded = current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read)); DCHECK_EQ(num_decoded, values_to_read); @@ -568,18 +549,17 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl { throw ParquetException("GetChunks only implemented for binary types"); } - private: + protected: using DecoderType = typename EncodingTraits<DType>::Decoder; + DecoderType* current_decoder_; + + private: // Map of encoding type to the respective decoder object. For example, a // column chunk's data pages may include both dictionary-encoded and // plain-encoded data. std::unordered_map<int, std::unique_ptr<DecoderType>> decoders_; - std::unique_ptr<BuilderType> builder_; - - DecoderType* current_decoder_; - // Initialize repetition and definition level decoders on the next data page. int64_t InitializeLevelDecoders(const DataPage& page, Encoding::type repetition_level_encoding, @@ -590,103 +570,143 @@ class TypedRecordReader : public RecordReader::RecordReaderImpl { // Advance to the next data page bool ReadNewPage() override; - void InitializeBuilder() {} - void ConfigureDictionary(const DictionaryPage* page); }; -// TODO(wesm): Implement these to some satisfaction -template <> -void TypedRecordReader<Int96Type>::DebugPrintState() {} +class FLBARecordReader : public TypedRecordReader<FLBAType> { + public: + FLBARecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) + : TypedRecordReader<FLBAType>(descr, pool), builder_(nullptr) { + DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); + int byte_width = descr_->type_length(); + std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); + builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_)); + } -template <> -void TypedRecordReader<ByteArrayType>::DebugPrintState() {} + ::arrow::ArrayVector GetBuilderChunks() override { + std::shared_ptr<::arrow::Array> chunk; + PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); + return ::arrow::ArrayVector({chunk}); + } -template <> -void TypedRecordReader<FLBAType>::DebugPrintState() {} + void ReadValuesDense(int64_t values_to_read) override { + auto values = ValuesHead<FLBA>(); + int64_t num_decoded = + current_decoder_->Decode(values, static_cast<int>(values_to_read)); + DCHECK_EQ(num_decoded, values_to_read); -template <> -void TypedRecordReader<ByteArrayType>::InitializeBuilder() { - // Maximum of 16MB chunks - constexpr int32_t kBinaryChunksize = 1 << 24; - DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); - builder_.reset(new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, pool_)); -} + for (int64_t i = 0; i < num_decoded; i++) { + PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); + } + ResetValues(); + } -template <> -void TypedRecordReader<FLBAType>::InitializeBuilder() { - DCHECK_EQ(descr_->physical_type(), Type::FIXED_LEN_BYTE_ARRAY); - int byte_width = descr_->type_length(); - std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width); - builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool_)); -} + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + uint8_t* valid_bits = valid_bits_->mutable_data(); + const int64_t valid_bits_offset = values_written_; + auto values = ValuesHead<FLBA>(); -template <> -::arrow::ArrayVector TypedRecordReader<ByteArrayType>::GetBuilderChunks() { - ::arrow::ArrayVector chunks; - PARQUET_THROW_NOT_OK(builder_->Finish(&chunks)); - return chunks; -} + int64_t num_decoded = current_decoder_->DecodeSpaced( + values, static_cast<int>(values_to_read), static_cast<int>(null_count), + valid_bits, valid_bits_offset); + DCHECK_EQ(num_decoded, values_to_read); -template <> -::arrow::ArrayVector TypedRecordReader<FLBAType>::GetBuilderChunks() { - std::shared_ptr<::arrow::Array> chunk; - PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); - return ::arrow::ArrayVector({chunk}); -} + for (int64_t i = 0; i < num_decoded; i++) { + if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { + PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); + } else { + PARQUET_THROW_NOT_OK(builder_->AppendNull()); + } + } + ResetValues(); + } -template <> -inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t values_to_read) { - int64_t num_decoded = current_decoder_->DecodeArrowNonNull( - static_cast<int>(values_to_read), builder_.get()); - DCHECK_EQ(num_decoded, values_to_read); - ResetValues(); -} + private: + std::unique_ptr<::arrow::FixedSizeBinaryBuilder> builder_; +}; -template <> -inline void TypedRecordReader<FLBAType>::ReadValuesDense(int64_t values_to_read) { - auto values = ValuesHead<FLBA>(); - int64_t num_decoded = - current_decoder_->Decode(values, static_cast<int>(values_to_read)); - DCHECK_EQ(num_decoded, values_to_read); +class ByteArrayChunkedRecordReader : public TypedRecordReader<ByteArrayType> { + public: + ByteArrayChunkedRecordReader(const ColumnDescriptor* descr, ::arrow::MemoryPool* pool) + : TypedRecordReader<ByteArrayType>(descr, pool), builder_(nullptr) { + // Maximum of 16MB chunks + constexpr int32_t kBinaryChunksize = 1 << 24; + DCHECK_EQ(descr_->physical_type(), Type::BYTE_ARRAY); + if (descr_->logical_type() == LogicalType::UTF8) { + builder_.reset( + new ::arrow::internal::ChunkedStringBuilder(kBinaryChunksize, pool_)); + } else { + builder_.reset( + new ::arrow::internal::ChunkedBinaryBuilder(kBinaryChunksize, pool_)); + } + } - for (int64_t i = 0; i < num_decoded; i++) { - PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); + ::arrow::ArrayVector GetBuilderChunks() override { + ::arrow::ArrayVector chunks; + PARQUET_THROW_NOT_OK(builder_->Finish(&chunks)); + return chunks; } - ResetValues(); -} + void ReadValuesDense(int64_t values_to_read) override { + int64_t num_decoded = current_decoder_->DecodeArrowNonNull( + static_cast<int>(values_to_read), builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + int64_t num_decoded = current_decoder_->DecodeArrow( + static_cast<int>(values_to_read), static_cast<int>(null_count), + valid_bits_->mutable_data(), values_written_, builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + private: + std::unique_ptr<::arrow::internal::ChunkedBinaryBuilder> builder_; +}; + +template <typename BuilderType> +class ByteArrayDictionaryRecordReader : public TypedRecordReader<ByteArrayType> { + public: + ByteArrayDictionaryRecordReader(const ColumnDescriptor* descr, + ::arrow::MemoryPool* pool) + : TypedRecordReader<ByteArrayType>(descr, pool), builder_(new BuilderType(pool)) {} + + ::arrow::ArrayVector GetBuilderChunks() override { + std::shared_ptr<::arrow::Array> chunk; + PARQUET_THROW_NOT_OK(builder_->Finish(&chunk)); + return ::arrow::ArrayVector({chunk}); + } + + void ReadValuesDense(int64_t values_to_read) override { + int64_t num_decoded = current_decoder_->DecodeArrowNonNull( + static_cast<int>(values_to_read), builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + void ReadValuesSpaced(int64_t values_to_read, int64_t null_count) override { + int64_t num_decoded = current_decoder_->DecodeArrow( + static_cast<int>(values_to_read), static_cast<int>(null_count), + valid_bits_->mutable_data(), values_written_, builder_.get()); + DCHECK_EQ(num_decoded, values_to_read); + ResetValues(); + } + + private: + std::unique_ptr<BuilderType> builder_; +}; + +// TODO(wesm): Implement these to some satisfaction template <> -inline void TypedRecordReader<ByteArrayType>::ReadValuesSpaced(int64_t values_to_read, - int64_t null_count) { - int64_t num_decoded = current_decoder_->DecodeArrow( - static_cast<int>(values_to_read), static_cast<int>(null_count), - valid_bits_->mutable_data(), values_written_, builder_.get()); - DCHECK_EQ(num_decoded, values_to_read); - ResetValues(); -} +void TypedRecordReader<Int96Type>::DebugPrintState() {} template <> -inline void TypedRecordReader<FLBAType>::ReadValuesSpaced(int64_t values_to_read, - int64_t null_count) { - uint8_t* valid_bits = valid_bits_->mutable_data(); - const int64_t valid_bits_offset = values_written_; - auto values = ValuesHead<FLBA>(); - - int64_t num_decoded = current_decoder_->DecodeSpaced( - values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits, - valid_bits_offset); - DCHECK_EQ(num_decoded, values_to_read); - - for (int64_t i = 0; i < num_decoded; i++) { - if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) { - PARQUET_THROW_NOT_OK(builder_->Append(values[i].ptr)); - } else { - PARQUET_THROW_NOT_OK(builder_->AppendNull()); - } - } - ResetValues(); -} +void TypedRecordReader<ByteArrayType>::DebugPrintState() {} + +template <> +void TypedRecordReader<FLBAType>::DebugPrintState() {} template <typename DType> inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage* page) { @@ -845,8 +865,27 @@ bool TypedRecordReader<DType>::ReadNewPage() { return true; } +std::shared_ptr<RecordReader> RecordReader::MakeByteArrayRecordReader( + const ColumnDescriptor* descr, arrow::MemoryPool* pool, bool read_dictionary) { + if (read_dictionary) { + if (descr->logical_type() == LogicalType::UTF8) { + using Builder = ::arrow::StringDictionaryBuilder; + return std::shared_ptr<RecordReader>( + new RecordReader(new ByteArrayDictionaryRecordReader<Builder>(descr, pool))); + } else { + using Builder = ::arrow::BinaryDictionaryBuilder; + return std::shared_ptr<RecordReader>( + new RecordReader(new ByteArrayDictionaryRecordReader<Builder>(descr, pool))); + } + } else { + return std::shared_ptr<RecordReader>( + new RecordReader(new ByteArrayChunkedRecordReader(descr, pool))); + } +} + std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr, - MemoryPool* pool) { + MemoryPool* pool, + const bool read_dictionary) { switch (descr->physical_type()) { case Type::BOOLEAN: return std::shared_ptr<RecordReader>( @@ -867,11 +906,10 @@ std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr, return std::shared_ptr<RecordReader>( new RecordReader(new TypedRecordReader<DoubleType>(descr, pool))); case Type::BYTE_ARRAY: - return std::shared_ptr<RecordReader>( - new RecordReader(new TypedRecordReader<ByteArrayType>(descr, pool))); + return RecordReader::MakeByteArrayRecordReader(descr, pool, read_dictionary); case Type::FIXED_LEN_BYTE_ARRAY: return std::shared_ptr<RecordReader>( - new RecordReader(new TypedRecordReader<FLBAType>(descr, pool))); + new RecordReader(new FLBARecordReader(descr, pool))); default: { // PARQUET-1481: This can occur if the file is corrupt std::stringstream ss; diff --git a/cpp/src/parquet/arrow/record_reader.h b/cpp/src/parquet/arrow/record_reader.h index cc932c2..c999dd0 100644 --- a/cpp/src/parquet/arrow/record_reader.h +++ b/cpp/src/parquet/arrow/record_reader.h @@ -51,7 +51,8 @@ class RecordReader { static std::shared_ptr<RecordReader> Make( const ColumnDescriptor* descr, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), + const bool read_dictionary = false); virtual ~RecordReader(); @@ -111,6 +112,10 @@ class RecordReader { private: std::unique_ptr<RecordReaderImpl> impl_; explicit RecordReader(RecordReaderImpl* impl); + + static std::shared_ptr<RecordReader> MakeByteArrayRecordReader( + const ColumnDescriptor* descr, ::arrow::MemoryPool* pool, + const bool read_dictionary); }; } // namespace internal diff --git a/cpp/src/parquet/encoding-benchmark.cc b/cpp/src/parquet/encoding-benchmark.cc index 8031aeb..0e88190 100644 --- a/cpp/src/parquet/encoding-benchmark.cc +++ b/cpp/src/parquet/encoding-benchmark.cc @@ -17,13 +17,29 @@ #include "benchmark/benchmark.h" +#include "arrow/array.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_dict.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/testing/util.h" +#include "arrow/type.h" + #include "parquet/encoding.h" #include "parquet/schema.h" #include "parquet/util/memory.h" +#include <random> + using arrow::default_memory_pool; using arrow::MemoryPool; +namespace { +// The min/max number of values used to drive each family of encoding benchmarks +constexpr int MIN_RANGE = 1024; +constexpr int MAX_RANGE = 65536; +} // namespace + namespace parquet { using schema::PrimitiveNode; @@ -39,14 +55,14 @@ static void BM_PlainEncodingBoolean(benchmark::State& state) { auto encoder = MakeEncoder(Type::BOOLEAN, Encoding::PLAIN); auto typed_encoder = dynamic_cast<BooleanEncoder*>(encoder.get()); - while (state.KeepRunning()) { + for (auto _ : state) { typed_encoder->Put(values, static_cast<int>(values.size())); typed_encoder->FlushValues(); } state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(bool)); } -BENCHMARK(BM_PlainEncodingBoolean)->Range(1024, 65536); +BENCHMARK(BM_PlainEncodingBoolean)->Range(MIN_RANGE, MAX_RANGE); static void BM_PlainDecodingBoolean(benchmark::State& state) { std::vector<bool> values(state.range(0), true); @@ -56,7 +72,7 @@ static void BM_PlainDecodingBoolean(benchmark::State& state) { typed_encoder->Put(values, static_cast<int>(values.size())); std::shared_ptr<Buffer> buf = encoder->FlushValues(); - while (state.KeepRunning()) { + for (auto _ : state) { auto decoder = MakeTypedDecoder<BooleanType>(Encoding::PLAIN); decoder->SetData(static_cast<int>(values.size()), buf->data(), static_cast<int>(buf->size())); @@ -67,19 +83,19 @@ static void BM_PlainDecodingBoolean(benchmark::State& state) { delete[] output; } -BENCHMARK(BM_PlainDecodingBoolean)->Range(1024, 65536); +BENCHMARK(BM_PlainDecodingBoolean)->Range(MIN_RANGE, MAX_RANGE); static void BM_PlainEncodingInt64(benchmark::State& state) { std::vector<int64_t> values(state.range(0), 64); auto encoder = MakeTypedEncoder<Int64Type>(Encoding::PLAIN); - while (state.KeepRunning()) { + for (auto _ : state) { encoder->Put(values.data(), static_cast<int>(values.size())); encoder->FlushValues(); } state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int64_t)); } -BENCHMARK(BM_PlainEncodingInt64)->Range(1024, 65536); +BENCHMARK(BM_PlainEncodingInt64)->Range(MIN_RANGE, MAX_RANGE); static void BM_PlainDecodingInt64(benchmark::State& state) { std::vector<int64_t> values(state.range(0), 64); @@ -87,7 +103,7 @@ static void BM_PlainDecodingInt64(benchmark::State& state) { encoder->Put(values.data(), static_cast<int>(values.size())); std::shared_ptr<Buffer> buf = encoder->FlushValues(); - while (state.KeepRunning()) { + for (auto _ : state) { auto decoder = MakeTypedDecoder<Int64Type>(Encoding::PLAIN); decoder->SetData(static_cast<int>(values.size()), buf->data(), static_cast<int>(buf->size())); @@ -96,7 +112,7 @@ static void BM_PlainDecodingInt64(benchmark::State& state) { state.SetBytesProcessed(state.iterations() * state.range(0) * sizeof(int64_t)); } -BENCHMARK(BM_PlainDecodingInt64)->Range(1024, 65536); +BENCHMARK(BM_PlainDecodingInt64)->Range(MIN_RANGE, MAX_RANGE); template <typename Type> static void DecodeDict(std::vector<typename Type::c_type>& values, @@ -126,7 +142,7 @@ static void DecodeDict(std::vector<typename Type::c_type>& values, PARQUET_THROW_NOT_OK(indices->Resize(actual_bytes)); - while (state.KeepRunning()) { + for (auto _ : state) { auto dict_decoder = MakeTypedDecoder<Type>(Encoding::PLAIN, descr.get()); dict_decoder->SetData(dict_traits->num_entries(), dict_buffer->data(), static_cast<int>(dict_buffer->size())); @@ -148,7 +164,7 @@ static void BM_DictDecodingInt64_repeats(benchmark::State& state) { DecodeDict<Type>(values, state); } -BENCHMARK(BM_DictDecodingInt64_repeats)->Range(1024, 65536); +BENCHMARK(BM_DictDecodingInt64_repeats)->Range(MIN_RANGE, MAX_RANGE); static void BM_DictDecodingInt64_literals(benchmark::State& state) { typedef Int64Type Type; @@ -161,6 +177,192 @@ static void BM_DictDecodingInt64_literals(benchmark::State& state) { DecodeDict<Type>(values, state); } -BENCHMARK(BM_DictDecodingInt64_literals)->Range(1024, 65536); +BENCHMARK(BM_DictDecodingInt64_literals)->Range(MIN_RANGE, MAX_RANGE); + +// ---------------------------------------------------------------------- +// Shared benchmarks for decoding using arrow builders +class BenchmarkDecodeArrow : public ::benchmark::Fixture { + public: + void SetUp(const ::benchmark::State& state) override { + num_values_ = static_cast<int>(state.range()); + InitDataInputs(); + DoEncodeData(); + } + + void TearDown(const ::benchmark::State& state) override {} + + void InitDataInputs() { + // Generate a random string dictionary without any nulls so that this dataset can be + // used for benchmarking the DecodeArrowNonNull API + constexpr int repeat_factor = 8; + constexpr int64_t min_length = 2; + constexpr int64_t max_length = 10; + ::arrow::random::RandomArrayGenerator rag(0); + input_array_ = rag.StringWithRepeats(num_values_, num_values_ / repeat_factor, + min_length, max_length, /*null_probability=*/0); + valid_bits_ = input_array_->null_bitmap()->data(); + values_ = std::vector<ByteArray>(); + values_.reserve(num_values_); + total_size_ = 0; + const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(*input_array_); + + for (int64_t i = 0; i < binary_array.length(); i++) { + auto view = binary_array.GetView(i); + values_.emplace_back(static_cast<uint32_t>(view.length()), + reinterpret_cast<const uint8_t*>(view.data())); + total_size_ += view.length(); + } + } + + virtual void DoEncodeData() = 0; + + virtual std::unique_ptr<ByteArrayDecoder> InitializeDecoder() = 0; + + template <typename BuilderType> + std::unique_ptr<BuilderType> CreateBuilder(); + + template <typename BuilderType> + void DecodeArrowBenchmark(benchmark::State& state) { + for (auto _ : state) { + auto decoder = InitializeDecoder(); + auto builder = CreateBuilder<BuilderType>(); + decoder->DecodeArrow(num_values_, 0, valid_bits_, 0, builder.get()); + } + + state.SetBytesProcessed(state.iterations() * total_size_); + } + + template <typename BuilderType> + void DecodeArrowNonNullBenchmark(benchmark::State& state) { + for (auto _ : state) { + auto decoder = InitializeDecoder(); + auto builder = CreateBuilder<BuilderType>(); + decoder->DecodeArrowNonNull(num_values_, builder.get()); + } + + state.SetBytesProcessed(state.iterations() * total_size_); + } + + protected: + int num_values_; + std::shared_ptr<::arrow::Array> input_array_; + uint64_t total_size_; + std::vector<ByteArray> values_; + const uint8_t* valid_bits_; + std::shared_ptr<Buffer> buffer_; +}; + +using ::arrow::BinaryDictionaryBuilder; +using ::arrow::internal::ChunkedBinaryBuilder; + +template <> +std::unique_ptr<ChunkedBinaryBuilder> BenchmarkDecodeArrow::CreateBuilder() { + int chunk_size = static_cast<int>(buffer_->size()); + return std::unique_ptr<ChunkedBinaryBuilder>( + new ChunkedBinaryBuilder(chunk_size, default_memory_pool())); +} + +template <> +std::unique_ptr<BinaryDictionaryBuilder> BenchmarkDecodeArrow::CreateBuilder() { + return std::unique_ptr<BinaryDictionaryBuilder>( + new BinaryDictionaryBuilder(default_memory_pool())); +} + +// ---------------------------------------------------------------------- +// Benchmark Decoding from Plain Encoding +class BM_PlainDecodingByteArray : public BenchmarkDecodeArrow { + public: + void DoEncodeData() override { + auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN); + encoder->Put(values_.data(), num_values_); + buffer_ = encoder->FlushValues(); + } + + std::unique_ptr<ByteArrayDecoder> InitializeDecoder() override { + auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::PLAIN); + decoder->SetData(num_values_, buffer_->data(), static_cast<int>(buffer_->size())); + return decoder; + } +}; + +BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrow_Dense) +(benchmark::State& state) { DecodeArrowBenchmark<ChunkedBinaryBuilder>(state); } +BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrow_Dense) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dense) +(benchmark::State& state) { DecodeArrowNonNullBenchmark<ChunkedBinaryBuilder>(state); } +BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dense) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrow_Dict) +(benchmark::State& state) { DecodeArrowBenchmark<BinaryDictionaryBuilder>(state); } +BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrow_Dict) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dict) +(benchmark::State& state) { DecodeArrowNonNullBenchmark<BinaryDictionaryBuilder>(state); } +BENCHMARK_REGISTER_F(BM_PlainDecodingByteArray, DecodeArrowNonNull_Dict) + ->Range(MIN_RANGE, MAX_RANGE); + +// ---------------------------------------------------------------------- +// Benchmark Decoding from Dictionary Encoding +class BM_DictDecodingByteArray : public BenchmarkDecodeArrow { + public: + void DoEncodeData() override { + auto node = schema::ByteArray("name"); + descr_ = std::unique_ptr<ColumnDescriptor>(new ColumnDescriptor(node, 0, 0)); + auto encoder = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN, + /*use_dictionary=*/true, descr_.get()); + ASSERT_NO_THROW(encoder->Put(values_.data(), num_values_)); + buffer_ = encoder->FlushValues(); + + auto dict_encoder = dynamic_cast<DictEncoder<ByteArrayType>*>(encoder.get()); + ASSERT_NE(dict_encoder, nullptr); + dict_buffer_ = + AllocateBuffer(default_memory_pool(), dict_encoder->dict_encoded_size()); + dict_encoder->WriteDict(dict_buffer_->mutable_data()); + num_dict_entries_ = dict_encoder->num_entries(); + } + + std::unique_ptr<ByteArrayDecoder> InitializeDecoder() override { + auto decoder = MakeTypedDecoder<ByteArrayType>(Encoding::PLAIN, descr_.get()); + decoder->SetData(num_dict_entries_, dict_buffer_->data(), + static_cast<int>(dict_buffer_->size())); + auto dict_decoder = MakeDictDecoder<ByteArrayType>(descr_.get()); + dict_decoder->SetDict(decoder.get()); + dict_decoder->SetData(num_values_, buffer_->data(), + static_cast<int>(buffer_->size())); + return std::unique_ptr<ByteArrayDecoder>( + dynamic_cast<ByteArrayDecoder*>(dict_decoder.release())); + } + + protected: + std::unique_ptr<ColumnDescriptor> descr_; + std::unique_ptr<DictDecoder<ByteArrayType>> dict_decoder_; + std::shared_ptr<Buffer> dict_buffer_; + int num_dict_entries_; +}; + +BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrow_Dense)(benchmark::State& state) { + DecodeArrowBenchmark<ChunkedBinaryBuilder>(state); +} +BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrow_Dense) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dense) +(benchmark::State& state) { DecodeArrowNonNullBenchmark<ChunkedBinaryBuilder>(state); } +BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dense) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrow_Dict) +(benchmark::State& state) { DecodeArrowBenchmark<BinaryDictionaryBuilder>(state); } +BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrow_Dict) + ->Range(MIN_RANGE, MAX_RANGE); + +BENCHMARK_DEFINE_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dict) +(benchmark::State& state) { DecodeArrowNonNullBenchmark<BinaryDictionaryBuilder>(state); } +BENCHMARK_REGISTER_F(BM_DictDecodingByteArray, DecodeArrowNonNull_Dict) + ->Range(MIN_RANGE, MAX_RANGE); } // namespace parquet diff --git a/cpp/src/parquet/encoding-test.cc b/cpp/src/parquet/encoding-test.cc index 28d9812..4ec537a 100644 --- a/cpp/src/parquet/encoding-test.cc +++ b/cpp/src/parquet/encoding-test.cc @@ -22,6 +22,12 @@ #include <string> #include <vector> +#include "arrow/array.h" +#include "arrow/compute/api.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/random.h" +#include "arrow/testing/util.h" +#include "arrow/type.h" #include "arrow/util/bit-util.h" #include "parquet/encoding.h" @@ -36,6 +42,13 @@ using arrow::MemoryPool; using std::string; using std::vector; +// TODO(hatemhelal): investigate whether this can be replaced with GTEST_SKIP in a future +// gtest release that contains https://github.com/google/googletest/pull/1544 +#define SKIP_TEST_IF(condition) \ + if (condition) { \ + return; \ + } + namespace parquet { namespace test { @@ -314,6 +327,258 @@ TEST(TestDictionaryEncoding, CannotDictDecodeBoolean) { ASSERT_THROW(MakeDictDecoder<BooleanType>(nullptr), ParquetException); } +// ---------------------------------------------------------------------- +// Shared arrow builder decode tests +template <typename T> +struct BuilderTraits {}; + +template <> +struct BuilderTraits<::arrow::BinaryType> { + using DenseArrayBuilder = ::arrow::internal::ChunkedBinaryBuilder; + using DictArrayBuilder = ::arrow::BinaryDictionaryBuilder; +}; + +template <> +struct BuilderTraits<::arrow::StringType> { + using DenseArrayBuilder = ::arrow::internal::ChunkedStringBuilder; + using DictArrayBuilder = ::arrow::StringDictionaryBuilder; +}; + +template <typename DType> +class TestArrowBuilderDecoding : public ::testing::Test { + public: + using DenseBuilder = typename BuilderTraits<DType>::DenseArrayBuilder; + using DictBuilder = typename BuilderTraits<DType>::DictArrayBuilder; + + void SetUp() override { null_probabilities_ = {0.0, 0.5, 1.0}; } + void TearDown() override {} + + void InitTestCase(double null_probability) { + GenerateInputData(null_probability); + SetupEncoderDecoder(); + } + + void GenerateInputData(double null_probability) { + constexpr int num_unique = 100; + constexpr int repeat = 10; + constexpr int64_t min_length = 2; + constexpr int64_t max_length = 10; + ::arrow::random::RandomArrayGenerator rag(0); + expected_dense_ = rag.StringWithRepeats(repeat * num_unique, num_unique, min_length, + max_length, null_probability); + + std::shared_ptr<::arrow::DataType> data_type = std::make_shared<DType>(); + + if (data_type->id() == ::arrow::BinaryType::type_id) { + // TODO(hatemhelal): this is a kludge. Probably best to extend the + // RandomArrayGenerator to also generate BinaryType arrays. + auto data = expected_dense_->data()->Copy(); + data->type = data_type; + expected_dense_ = std::make_shared<::arrow::BinaryArray>(data); + } + + num_values_ = static_cast<int>(expected_dense_->length()); + null_count_ = static_cast<int>(expected_dense_->null_count()); + valid_bits_ = expected_dense_->null_bitmap()->data(); + + auto builder = CreateDictBuilder(); + ASSERT_OK(builder->AppendArray(*expected_dense_)); + ASSERT_OK(builder->Finish(&expected_dict_)); + + // Initialize input_data_ for the encoder from the expected_array_ values + const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(*expected_dense_); + input_data_.reserve(binary_array.length()); + + for (int64_t i = 0; i < binary_array.length(); ++i) { + auto view = binary_array.GetView(i); + input_data_[i] = {static_cast<uint32_t>(view.length()), + reinterpret_cast<const uint8_t*>(view.data())}; + } + } + + std::unique_ptr<DenseBuilder> CreateDenseBuilder() { + // Use same default chunk size of 16MB as used in ByteArrayChunkedRecordReader + constexpr int32_t kChunkSize = 1 << 24; + return std::unique_ptr<DenseBuilder>( + new DenseBuilder(kChunkSize, default_memory_pool())); + } + + std::unique_ptr<DictBuilder> CreateDictBuilder() { + return std::unique_ptr<DictBuilder>(new DictBuilder(default_memory_pool())); + } + + // Setup encoder/decoder pair for testing with + virtual void SetupEncoderDecoder() = 0; + + template <typename Builder> + void CheckDense(int actual_num_values, Builder& builder) { + ASSERT_EQ(actual_num_values, num_values_); + ::arrow::ArrayVector actual_vec; + ASSERT_OK(builder.Finish(&actual_vec)); + ASSERT_EQ(actual_vec.size(), 1); + ASSERT_ARRAYS_EQUAL(*actual_vec[0], *expected_dense_); + } + + template <typename Builder> + void CheckDict(int actual_num_values, Builder& builder) { + ASSERT_EQ(actual_num_values, num_values_); + std::shared_ptr<::arrow::Array> actual; + ASSERT_OK(builder.Finish(&actual)); + ASSERT_ARRAYS_EQUAL(*actual, *expected_dict_); + } + + void CheckDecodeArrowUsingDenseBuilder() { + for (auto np : null_probabilities_) { + InitTestCase(np); + auto builder = CreateDenseBuilder(); + auto actual_num_values = + decoder_->DecodeArrow(num_values_, null_count_, valid_bits_, 0, builder.get()); + CheckDense(actual_num_values, *builder); + } + } + + void CheckDecodeArrowUsingDictBuilder() { + for (auto np : null_probabilities_) { + InitTestCase(np); + auto builder = CreateDictBuilder(); + auto actual_num_values = + decoder_->DecodeArrow(num_values_, null_count_, valid_bits_, 0, builder.get()); + CheckDict(actual_num_values, *builder); + } + } + + void CheckDecodeArrowNonNullUsingDenseBuilder() { + for (auto np : null_probabilities_) { + InitTestCase(np); + SKIP_TEST_IF(null_count_ > 0) + auto builder = CreateDenseBuilder(); + auto actual_num_values = decoder_->DecodeArrowNonNull(num_values_, builder.get()); + CheckDense(actual_num_values, *builder); + } + } + + void CheckDecodeArrowNonNullUsingDictBuilder() { + for (auto np : null_probabilities_) { + InitTestCase(np); + SKIP_TEST_IF(null_count_ > 0) + auto builder = CreateDictBuilder(); + auto actual_num_values = decoder_->DecodeArrowNonNull(num_values_, builder.get()); + CheckDict(actual_num_values, *builder); + } + } + + protected: + std::vector<double> null_probabilities_; + std::shared_ptr<::arrow::Array> expected_dict_; + std::shared_ptr<::arrow::Array> expected_dense_; + int num_values_; + int null_count_; + vector<ByteArray> input_data_; + const uint8_t* valid_bits_; + std::unique_ptr<ByteArrayEncoder> encoder_; + std::unique_ptr<ByteArrayDecoder> decoder_; + std::shared_ptr<Buffer> buffer_; +}; + +#define TEST_ARROW_BUILDER_BASE_MEMBERS() \ + using TestArrowBuilderDecoding<DType>::encoder_; \ + using TestArrowBuilderDecoding<DType>::decoder_; \ + using TestArrowBuilderDecoding<DType>::input_data_; \ + using TestArrowBuilderDecoding<DType>::valid_bits_; \ + using TestArrowBuilderDecoding<DType>::num_values_; \ + using TestArrowBuilderDecoding<DType>::buffer_ + +template <typename DType> +class PlainEncoding : public TestArrowBuilderDecoding<DType> { + public: + void SetupEncoderDecoder() override { + encoder_ = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN); + decoder_ = MakeTypedDecoder<ByteArrayType>(Encoding::PLAIN); + ASSERT_NO_THROW(encoder_->PutSpaced(input_data_.data(), num_values_, valid_bits_, 0)); + buffer_ = encoder_->FlushValues(); + decoder_->SetData(num_values_, buffer_->data(), static_cast<int>(buffer_->size())); + } + + protected: + TEST_ARROW_BUILDER_BASE_MEMBERS(); +}; + +using BuilderArrayTypes = ::testing::Types<::arrow::BinaryType, ::arrow::StringType>; +// using BuilderArrayTypes = ::testing::Types<::arrow::StringType>; +TYPED_TEST_CASE(PlainEncoding, BuilderArrayTypes); + +TYPED_TEST(PlainEncoding, CheckDecodeArrowUsingDenseBuilder) { + this->CheckDecodeArrowUsingDenseBuilder(); +} + +TYPED_TEST(PlainEncoding, CheckDecodeArrowUsingDictBuilder) { + this->CheckDecodeArrowUsingDictBuilder(); +} + +TYPED_TEST(PlainEncoding, CheckDecodeArrowNonNullDenseBuilder) { + this->CheckDecodeArrowNonNullUsingDenseBuilder(); +} + +TYPED_TEST(PlainEncoding, CheckDecodeArrowNonNullDictBuilder) { + this->CheckDecodeArrowNonNullUsingDictBuilder(); +} + +template <typename DType> +class DictEncoding : public TestArrowBuilderDecoding<DType> { + public: + void SetupEncoderDecoder() override { + auto node = schema::ByteArray("name"); + descr_ = std::unique_ptr<ColumnDescriptor>(new ColumnDescriptor(node, 0, 0)); + encoder_ = MakeTypedEncoder<ByteArrayType>(Encoding::PLAIN, /*use_dictionary=*/true, + descr_.get()); + ASSERT_NO_THROW(encoder_->PutSpaced(input_data_.data(), num_values_, valid_bits_, 0)); + buffer_ = encoder_->FlushValues(); + + auto dict_encoder = dynamic_cast<DictEncoder<ByteArrayType>*>(encoder_.get()); + ASSERT_NE(dict_encoder, nullptr); + dict_buffer_ = + AllocateBuffer(default_memory_pool(), dict_encoder->dict_encoded_size()); + dict_encoder->WriteDict(dict_buffer_->mutable_data()); + + // Simulate reading the dictionary page followed by a data page + plain_decoder_ = MakeTypedDecoder<ByteArrayType>(Encoding::PLAIN, descr_.get()); + plain_decoder_->SetData(dict_encoder->num_entries(), dict_buffer_->data(), + static_cast<int>(dict_buffer_->size())); + + dict_decoder_ = MakeDictDecoder<ByteArrayType>(descr_.get()); + dict_decoder_->SetDict(plain_decoder_.get()); + dict_decoder_->SetData(num_values_, buffer_->data(), + static_cast<int>(buffer_->size())); + decoder_ = std::unique_ptr<ByteArrayDecoder>( + dynamic_cast<ByteArrayDecoder*>(dict_decoder_.release())); + } + + protected: + TEST_ARROW_BUILDER_BASE_MEMBERS(); + std::unique_ptr<ColumnDescriptor> descr_; + std::unique_ptr<ByteArrayDecoder> plain_decoder_; + std::unique_ptr<DictDecoder<ByteArrayType>> dict_decoder_; + std::shared_ptr<Buffer> dict_buffer_; +}; + +TYPED_TEST_CASE(DictEncoding, BuilderArrayTypes); + +TYPED_TEST(DictEncoding, CheckDecodeArrowUsingDenseBuilder) { + this->CheckDecodeArrowUsingDenseBuilder(); +} + +TYPED_TEST(DictEncoding, CheckDecodeArrowUsingDictBuilder) { + this->CheckDecodeArrowUsingDictBuilder(); +} + +TYPED_TEST(DictEncoding, CheckDecodeArrowNonNullDenseBuilder) { + this->CheckDecodeArrowNonNullUsingDenseBuilder(); +} + +TYPED_TEST(DictEncoding, CheckDecodeArrowNonNullDictBuilder) { + this->CheckDecodeArrowNonNullUsingDictBuilder(); +} + } // namespace test } // namespace parquet diff --git a/cpp/src/parquet/encoding.cc b/cpp/src/parquet/encoding.cc index da63067..217ee80 100644 --- a/cpp/src/parquet/encoding.cc +++ b/cpp/src/parquet/encoding.cc @@ -24,7 +24,6 @@ #include <utility> #include <vector> -#include "arrow/builder.h" #include "arrow/status.h" #include "arrow/util/bit-stream-utils.h" #include "arrow/util/bit-util.h" @@ -697,40 +696,12 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>, using Base::DecodeSpaced; using Base::PlainDecoder; - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::internal::ChunkedBinaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK( - DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); - return result; - } - - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::BinaryDictionaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK( - DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); - return result; - } - - int DecodeArrowNonNull(int num_values, - ::arrow::internal::ChunkedBinaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, out, &result)); - return result; - } - private: - template <typename BuilderType> ::arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, BuilderType* out, - int* values_decoded) { + int64_t valid_bits_offset, WrappedBuilderInterface* builder, + int* values_decoded) override { num_values = std::min(num_values, num_values_); - - ARROW_RETURN_NOT_OK(out->Reserve(num_values)); - + builder->Reserve(num_values); ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); int increment; int i = 0; @@ -744,15 +715,15 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>, if (data_size < increment) { ParquetException::EofException(); } - ARROW_RETURN_NOT_OK(out->Append(data + sizeof(uint32_t), len)); + builder->Append(data + sizeof(uint32_t), len); data += increment; data_size -= increment; bytes_decoded += increment; - ++i; } else { - ARROW_RETURN_NOT_OK(out->AppendNull()); + builder->AppendNull(); } bit_reader.Next(); + ++i; } data_ += bytes_decoded; @@ -762,23 +733,24 @@ class PlainByteArrayDecoder : public PlainDecoder<ByteArrayType>, return ::arrow::Status::OK(); } - ::arrow::Status DecodeArrowNonNull(int num_values, - ::arrow::internal::ChunkedBinaryBuilder* out, - int* values_decoded) { + ::arrow::Status DecodeArrowNonNull(int num_values, WrappedBuilderInterface* builder, + int* values_decoded) override { num_values = std::min(num_values, num_values_); - ARROW_RETURN_NOT_OK(out->Reserve(num_values)); + builder->Reserve(num_values); int i = 0; const uint8_t* data = data_; int64_t data_size = len_; int bytes_decoded = 0; + while (i < num_values) { uint32_t len = *reinterpret_cast<const uint32_t*>(data); int increment = static_cast<int>(sizeof(uint32_t) + len); if (data_size < increment) ParquetException::EofException(); - ARROW_RETURN_NOT_OK(out->Append(data + sizeof(uint32_t), len)); + builder->Append(data + sizeof(uint32_t), len); data += increment; data_size -= increment; bytes_decoded += increment; + ++i; } data_ += bytes_decoded; @@ -916,39 +888,13 @@ class DictByteArrayDecoder : public DictDecoderImpl<ByteArrayType>, using BASE = DictDecoderImpl<ByteArrayType>; using BASE::DictDecoderImpl; - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::internal::ChunkedBinaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK( - DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); - return result; - } - - int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::BinaryDictionaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK( - DecodeArrow(num_values, null_count, valid_bits, valid_bits_offset, out, &result)); - return result; - } - - int DecodeArrowNonNull(int num_values, - ::arrow::internal::ChunkedBinaryBuilder* out) override { - int result = 0; - PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, out, &result)); - return result; - } - private: - template <typename BuilderType> ::arrow::Status DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, BuilderType* builder, - int* out_num_values) { + int64_t valid_bits_offset, WrappedBuilderInterface* builder, + int* out_num_values) override { constexpr int32_t buffer_size = 1024; int32_t indices_buffer[buffer_size]; - + builder->Reserve(num_values); ::arrow::internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, num_values); int values_decoded = 0; @@ -966,10 +912,10 @@ class DictByteArrayDecoder : public DictDecoderImpl<ByteArrayType>, // Consume all indices if (is_valid) { const auto& val = dictionary_[indices_buffer[i]]; - ARROW_RETURN_NOT_OK(builder->Append(val.ptr, val.len)); + builder->Append(val.ptr, val.len); ++i; } else { - ARROW_RETURN_NOT_OK(builder->AppendNull()); + builder->AppendNull(); --null_count; } ++values_decoded; @@ -982,7 +928,7 @@ class DictByteArrayDecoder : public DictDecoderImpl<ByteArrayType>, bit_reader.Next(); } } else { - ARROW_RETURN_NOT_OK(builder->AppendNull()); + builder->AppendNull(); --null_count; ++values_decoded; } @@ -995,18 +941,20 @@ class DictByteArrayDecoder : public DictDecoderImpl<ByteArrayType>, return ::arrow::Status::OK(); } - template <typename BuilderType> - ::arrow::Status DecodeArrowNonNull(int num_values, BuilderType* builder, - int* out_num_values) { + ::arrow::Status DecodeArrowNonNull(int num_values, WrappedBuilderInterface* builder, + int* out_num_values) override { constexpr int32_t buffer_size = 2048; int32_t indices_buffer[buffer_size]; int values_decoded = 0; + builder->Reserve(num_values); + while (values_decoded < num_values) { - int num_indices = idx_decoder_.GetBatch(indices_buffer, buffer_size); + int32_t batch_size = std::min<int32_t>(buffer_size, num_values - values_decoded); + int num_indices = idx_decoder_.GetBatch(indices_buffer, batch_size); if (num_indices == 0) break; for (int i = 0; i < num_indices; ++i) { const auto& val = dictionary_[indices_buffer[i]]; - PARQUET_THROW_NOT_OK(builder->Append(val.ptr, val.len)); + builder->Append(val.ptr, val.len); } values_decoded += num_indices; } diff --git a/cpp/src/parquet/encoding.h b/cpp/src/parquet/encoding.h index 046296c..09c1d0f 100644 --- a/cpp/src/parquet/encoding.h +++ b/cpp/src/parquet/encoding.h @@ -32,17 +32,6 @@ #include "parquet/util/memory.h" #include "parquet/util/visibility.h" -namespace arrow { - -class BinaryDictionaryBuilder; - -namespace internal { - -class ChunkedBinaryBuilder; - -} // namespace internal -} // namespace arrow - namespace parquet { class ColumnDescriptor; @@ -207,18 +196,61 @@ using DoubleDecoder = TypedDecoder<DoubleType>; class ByteArrayDecoder : virtual public TypedDecoder<ByteArrayType> { public: using TypedDecoder<ByteArrayType>::DecodeSpaced; - virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::internal::ChunkedBinaryBuilder* builder) = 0; - - virtual int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, - int64_t valid_bits_offset, - ::arrow::BinaryDictionaryBuilder* builder) = 0; - - // TODO(wesm): Implement DecodeArrowNonNull as part of ARROW-3325 - // See also ARROW-3772, ARROW-3769 - virtual int DecodeArrowNonNull(int num_values, - ::arrow::internal::ChunkedBinaryBuilder* builder) = 0; + + class WrappedBuilderInterface { + public: + virtual void Reserve(int64_t values) = 0; + virtual void Append(const uint8_t* value, uint32_t length) = 0; + virtual void AppendNull() = 0; + virtual ~WrappedBuilderInterface() = default; + }; + + template <typename Builder> + class WrappedBuilder : public WrappedBuilderInterface { + public: + explicit WrappedBuilder(Builder* builder) : builder_(builder) {} + + void Reserve(int64_t values) override { + PARQUET_THROW_NOT_OK(builder_->Reserve(values)); + } + void Append(const uint8_t* value, uint32_t length) override { + PARQUET_THROW_NOT_OK(builder_->Append(value, length)); + } + + void AppendNull() override { PARQUET_THROW_NOT_OK(builder_->AppendNull()); } + + private: + Builder* builder_; + }; + + template <typename Builder> + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, Builder* builder) { + int result = 0; + WrappedBuilder<Builder> wrapped_builder(builder); + PARQUET_THROW_NOT_OK(DecodeArrow(num_values, null_count, valid_bits, + valid_bits_offset, &wrapped_builder, &result)); + return result; + } + + template <typename Builder> + int DecodeArrowNonNull(int num_values, Builder* builder) { + int result = 0; + WrappedBuilder<Builder> wrapped_builder(builder); + PARQUET_THROW_NOT_OK(DecodeArrowNonNull(num_values, &wrapped_builder, &result)); + return result; + } + + private: + virtual ::arrow::Status DecodeArrow(int num_values, int null_count, + const uint8_t* valid_bits, + int64_t valid_bits_offset, + WrappedBuilderInterface* builder, + int* values_decoded) = 0; + + virtual ::arrow::Status DecodeArrowNonNull(int num_values, + WrappedBuilderInterface* builder, + int* values_decoded) = 0; }; class FLBADecoder : virtual public TypedDecoder<FLBAType> {