This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new bec0385679 PARQUET-2411: [C++][Parquet] Allow reading dictionary
without reading data via ByteArrayDictionaryRecordReader (#39153)
bec0385679 is described below
commit bec03856799a69bf0e6d4419ab7bc565afd070fe
Author: Jinpeng <[email protected]>
AuthorDate: Thu Jan 4 21:41:01 2024 -0500
PARQUET-2411: [C++][Parquet] Allow reading dictionary without reading data
via ByteArrayDictionaryRecordReader (#39153)
### Rationale for this change
This proposes an API to read only the dictionary from
ByteArrayDictionaryRecordReader, enabling possible uses cases where the caller
just want to check the dictionary content.
### What changes are included in this PR?
New APIs to enable reading dictionary with RecordReader.
### Are these changes tested?
Unit tests.
### Are there any user-facing changes?
New APIs without breaking existing workflow.
Authored-by: jp0317 <[email protected]>
Signed-off-by: mwish <[email protected]>
---
cpp/src/parquet/column_reader.cc | 20 ++++++
cpp/src/parquet/column_reader.h | 10 +++
cpp/src/parquet/file_reader.cc | 79 ++++++++++++++----------
cpp/src/parquet/file_reader.h | 15 ++++-
cpp/src/parquet/reader_test.cc | 127 +++++++++++++++++++++++++++++++++++++++
5 files changed, 217 insertions(+), 34 deletions(-)
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index a49e58afbd..99978e283b 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -1370,6 +1370,26 @@ class TypedRecordReader : public
TypedColumnReaderImpl<DType>,
return bytes_for_values;
}
+ const void* ReadDictionary(int32_t* dictionary_length) override {
+ if (this->current_decoder_ == nullptr && !this->HasNextInternal()) {
+ dictionary_length = 0;
+ return nullptr;
+ }
+ // Verify the current data page is dictionary encoded. The
current_encoding_ should
+ // have been set as RLE_DICTIONARY if the page encoding is RLE_DICTIONARY
or
+ // PLAIN_DICTIONARY.
+ if (this->current_encoding_ != Encoding::RLE_DICTIONARY) {
+ std::stringstream ss;
+ ss << "Data page is not dictionary encoded. Encoding: "
+ << EncodingToString(this->current_encoding_);
+ throw ParquetException(ss.str());
+ }
+ auto decoder = dynamic_cast<DictDecoder<DType>*>(this->current_decoder_);
+ const T* dictionary = nullptr;
+ decoder->GetDictionary(&dictionary, dictionary_length);
+ return reinterpret_cast<const void*>(dictionary);
+ }
+
int64_t ReadRecords(int64_t num_records) override {
if (num_records == 0) return 0;
// Delimit records, then read values at the end
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index 334b8bcffe..086f6c0e55 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -368,6 +368,16 @@ class PARQUET_EXPORT RecordReader {
virtual void DebugPrintState() = 0;
+ /// \brief Returns the dictionary owned by the current decoder. Throws an
+ /// exception if the current decoder is not for dictionary encoding. The
caller is
+ /// responsible for casting the returned pointer to proper type depending on
the
+ /// column's physical type. An example:
+ /// const ByteArray* dict = reinterpret_cast<const
ByteArray*>(ReadDictionary(&len));
+ /// or:
+ /// const float* dict = reinterpret_cast<const
float*>(ReadDictionary(&len));
+ /// \param[out] dictionary_length The number of dictionary entries.
+ virtual const void* ReadDictionary(int32_t* dictionary_length) = 0;
+
/// \brief Decoded definition levels
int16_t* def_levels() const {
return reinterpret_cast<int16_t*>(def_levels_->mutable_data());
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 1d972b78fb..b3dd1d6054 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -54,6 +54,36 @@ using arrow::internal::AddWithOverflow;
namespace parquet {
+namespace {
+bool IsColumnChunkFullyDictionaryEncoded(const ColumnChunkMetaData& col) {
+ // Check the encoding_stats to see if all data pages are dictionary encoded.
+ const std::vector<PageEncodingStats>& encoding_stats = col.encoding_stats();
+ if (encoding_stats.empty()) {
+ // Some parquet files may have empty encoding_stats. In this case we are
+ // not sure whether all data pages are dictionary encoded.
+ return false;
+ }
+ // The 1st page should be the dictionary page.
+ if (encoding_stats[0].page_type != PageType::DICTIONARY_PAGE ||
+ (encoding_stats[0].encoding != Encoding::PLAIN &&
+ encoding_stats[0].encoding != Encoding::PLAIN_DICTIONARY)) {
+ return false;
+ }
+ // The following pages should be dictionary encoded data pages.
+ for (size_t idx = 1; idx < encoding_stats.size(); ++idx) {
+ if ((encoding_stats[idx].encoding != Encoding::RLE_DICTIONARY &&
+ encoding_stats[idx].encoding != Encoding::PLAIN_DICTIONARY) ||
+ (encoding_stats[idx].page_type != PageType::DATA_PAGE &&
+ encoding_stats[idx].page_type != PageType::DATA_PAGE_V2)) {
+ // Return false if any following page is not a dictionary encoded data
+ // page.
+ return false;
+ }
+ }
+ return true;
+}
+} // namespace
+
// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
static constexpr int64_t kDefaultFooterReadSize = 64 * 1024;
static constexpr uint32_t kFooterSize = 8;
@@ -82,7 +112,8 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
}
-std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(int i) {
+std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(
+ int i, bool read_dictionary) {
if (i >= metadata()->num_columns()) {
std::stringstream ss;
ss << "Trying to read column index " << i << " but row group metadata has
only "
@@ -96,8 +127,8 @@ std::shared_ptr<internal::RecordReader>
RowGroupReader::RecordReader(int i) {
internal::LevelInfo level_info =
internal::LevelInfo::ComputeLevelInfo(descr);
auto reader = internal::RecordReader::Make(
- descr, level_info, contents_->properties()->memory_pool(),
- /* read_dictionary = */ false,
contents_->properties()->read_dense_for_nullable());
+ descr, level_info, contents_->properties()->memory_pool(),
read_dictionary,
+ contents_->properties()->read_dense_for_nullable());
reader->SetPageReader(std::move(page_reader));
return reader;
}
@@ -106,41 +137,23 @@ std::shared_ptr<ColumnReader>
RowGroupReader::ColumnWithExposeEncoding(
int i, ExposedEncoding encoding_to_expose) {
std::shared_ptr<ColumnReader> reader = Column(i);
- if (encoding_to_expose == ExposedEncoding::DICTIONARY) {
- // Check the encoding_stats to see if all data pages are dictionary
encoded.
- std::unique_ptr<ColumnChunkMetaData> col = metadata()->ColumnChunk(i);
- const std::vector<PageEncodingStats>& encoding_stats =
col->encoding_stats();
- if (encoding_stats.empty()) {
- // Some parquet files may have empty encoding_stats. In this case we are
- // not sure whether all data pages are dictionary encoded. So we do not
- // enable exposing dictionary.
- return reader;
- }
- // The 1st page should be the dictionary page.
- if (encoding_stats[0].page_type != PageType::DICTIONARY_PAGE ||
- (encoding_stats[0].encoding != Encoding::PLAIN &&
- encoding_stats[0].encoding != Encoding::PLAIN_DICTIONARY)) {
- return reader;
- }
- // The following pages should be dictionary encoded data pages.
- for (size_t idx = 1; idx < encoding_stats.size(); ++idx) {
- if ((encoding_stats[idx].encoding != Encoding::RLE_DICTIONARY &&
- encoding_stats[idx].encoding != Encoding::PLAIN_DICTIONARY) ||
- (encoding_stats[idx].page_type != PageType::DATA_PAGE &&
- encoding_stats[idx].page_type != PageType::DATA_PAGE_V2)) {
- return reader;
- }
- }
- } else {
- // Exposing other encodings are not supported for now.
- return reader;
+ if (encoding_to_expose == ExposedEncoding::DICTIONARY &&
+ IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i))) {
+ // Set exposed encoding.
+ reader->SetExposedEncoding(encoding_to_expose);
}
- // Set exposed encoding.
- reader->SetExposedEncoding(encoding_to_expose);
return reader;
}
+std::shared_ptr<internal::RecordReader>
RowGroupReader::RecordReaderWithExposeEncoding(
+ int i, ExposedEncoding encoding_to_expose) {
+ return RecordReader(
+ i,
+ /*read_dictionary=*/encoding_to_expose == ExposedEncoding::DICTIONARY &&
+ IsColumnChunkFullyDictionaryEncoded(*metadata()->ColumnChunk(i)));
+}
+
std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
if (i >= metadata()->num_columns()) {
std::stringstream ss;
diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h
index da85b73fc2..b59b59f95c 100644
--- a/cpp/src/parquet/file_reader.h
+++ b/cpp/src/parquet/file_reader.h
@@ -64,7 +64,8 @@ class PARQUET_EXPORT RowGroupReader {
// EXPERIMENTAL: Construct a RecordReader for the indicated column of the
row group.
// Ownership is shared with the RowGroupReader.
- std::shared_ptr<internal::RecordReader> RecordReader(int i);
+ std::shared_ptr<internal::RecordReader> RecordReader(int i,
+ bool read_dictionary =
false);
// Construct a ColumnReader, trying to enable exposed encoding.
//
@@ -80,6 +81,18 @@ class PARQUET_EXPORT RowGroupReader {
std::shared_ptr<ColumnReader> ColumnWithExposeEncoding(
int i, ExposedEncoding encoding_to_expose);
+ // Construct a RecordReader, trying to enable exposed encoding.
+ //
+ // For dictionary encoding, currently we only support column chunks that are
+ // fully dictionary encoded byte arrays. The caller should verify if the
reader can read
+ // and expose the dictionary by checking the reader's read_dictionary(). If
a column
+ // chunk uses dictionary encoding but then falls back to plain encoding, the
returned
+ // reader will read decoded data without exposing the dictionary.
+ //
+ // \note API EXPERIMENTAL
+ std::shared_ptr<internal::RecordReader> RecordReaderWithExposeEncoding(
+ int i, ExposedEncoding encoding_to_expose);
+
std::unique_ptr<PageReader> GetColumnPageReader(int i);
private:
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index 5223158e5f..2c2b62f5d1 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -542,6 +542,83 @@ TEST(TestFileReader, GetRecordReader) {
ASSERT_EQ(8, col_record_reader_->levels_written());
}
+TEST(TestFileReader, RecordReaderWithExposingDictionary) {
+ const int num_rows = 1000;
+
+ // Make schema
+ schema::NodeVector fields;
+ fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED,
Type::BYTE_ARRAY,
+ ConvertedType::NONE));
+ auto schema = std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+ // Write small batches and small data pages
+ std::shared_ptr<WriterProperties> writer_props = WriterProperties::Builder()
+ .write_batch_size(64)
+ ->data_pagesize(128)
+ ->enable_dictionary()
+ ->build();
+
+ ASSERT_OK_AND_ASSIGN(auto out_file,
::arrow::io::BufferOutputStream::Create());
+ std::shared_ptr<ParquetFileWriter> file_writer =
+ ParquetFileWriter::Open(out_file, schema, writer_props);
+
+ RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
+
+ // write one column
+ ::arrow::random::RandomArrayGenerator rag(0);
+ ByteArrayWriter* writer =
static_cast<ByteArrayWriter*>(rg_writer->NextColumn());
+ std::vector<std::string> raw_unique_data = {"a", "bc", "defg"};
+ std::vector<ByteArray> col_typed;
+ for (int i = 0; i < num_rows; i++) {
+ std::string_view chosed_data = raw_unique_data[i % raw_unique_data.size()];
+ col_typed.emplace_back(chosed_data);
+ }
+ writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.data());
+ rg_writer->Close();
+ file_writer->Close();
+
+ // Open the reader
+ ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish());
+ auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf);
+
+ ReaderProperties reader_props;
+ reader_props.enable_buffered_stream();
+ reader_props.set_buffer_size(64);
+ std::unique_ptr<ParquetFileReader> file_reader =
+ ParquetFileReader::Open(in_file, reader_props);
+
+ auto row_group = file_reader->RowGroup(0);
+ auto record_reader =
std::dynamic_pointer_cast<internal::DictionaryRecordReader>(
+ row_group->RecordReaderWithExposeEncoding(0,
ExposedEncoding::DICTIONARY));
+ ASSERT_NE(record_reader, nullptr);
+ ASSERT_TRUE(record_reader->read_dictionary());
+
+ int32_t dict_len = 0;
+ auto dict =
+ reinterpret_cast<const
ByteArray*>(record_reader->ReadDictionary(&dict_len));
+ ASSERT_NE(dict, nullptr);
+ ASSERT_EQ(dict_len, raw_unique_data.size());
+ ASSERT_EQ(record_reader->ReadRecords(num_rows), num_rows);
+ std::shared_ptr<::arrow::ChunkedArray> result_array =
record_reader->GetResult();
+ ASSERT_EQ(result_array->num_chunks(), 1);
+ const std::shared_ptr<::arrow::Array> chunk = result_array->chunk(0);
+ auto dictionary_array =
std::dynamic_pointer_cast<::arrow::DictionaryArray>(chunk);
+ const int32_t* indices =
+
(std::dynamic_pointer_cast<::arrow::Int32Array>(dictionary_array->indices()))
+ ->raw_values();
+
+ // Verify values based on the dictionary from ReadDictionary().
+ int64_t indices_read = chunk->length();
+ ASSERT_EQ(indices_read, num_rows);
+ for (int i = 0; i < indices_read; ++i) {
+ ASSERT_LT(indices[i], dict_len);
+ ASSERT_EQ(std::string_view(reinterpret_cast<const char*
const>(dict[indices[i]].ptr),
+ dict[indices[i]].len),
+ col_typed[i]);
+ }
+}
+
class TestLocalFile : public ::testing::Test {
public:
void SetUp() {
@@ -1064,6 +1141,56 @@ TEST(TestFileReader, BufferedReadsWithDictionary) {
}
}
+TEST(TestFileReader, PartiallyDictionaryEncodingNotExposed) {
+ const int num_rows = 1000;
+
+ // Make schema
+ schema::NodeVector fields;
+ fields.push_back(PrimitiveNode::Make("field", Repetition::REQUIRED,
Type::DOUBLE,
+ ConvertedType::NONE));
+ auto schema = std::static_pointer_cast<GroupNode>(
+ GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+ // Write small batches and small data pages. Explicitly set the dictionary
page size
+ // limit such that the column chunk will not be fully dictionary encoded.
+ std::shared_ptr<WriterProperties> writer_props = WriterProperties::Builder()
+ .write_batch_size(64)
+ ->data_pagesize(128)
+ ->enable_dictionary()
+
->dictionary_pagesize_limit(4)
+ ->build();
+
+ ASSERT_OK_AND_ASSIGN(auto out_file,
::arrow::io::BufferOutputStream::Create());
+ std::shared_ptr<ParquetFileWriter> file_writer =
+ ParquetFileWriter::Open(out_file, schema, writer_props);
+
+ RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
+
+ // write one column
+ ::arrow::random::RandomArrayGenerator rag(0);
+ DoubleWriter* writer = static_cast<DoubleWriter*>(rg_writer->NextColumn());
+ std::shared_ptr<::arrow::Array> col = rag.Float64(num_rows, 0, 100);
+ const auto& col_typed = static_cast<const ::arrow::DoubleArray&>(*col);
+ writer->WriteBatch(num_rows, nullptr, nullptr, col_typed.raw_values());
+ rg_writer->Close();
+ file_writer->Close();
+
+ // Open the reader
+ ASSERT_OK_AND_ASSIGN(auto file_buf, out_file->Finish());
+ auto in_file = std::make_shared<::arrow::io::BufferReader>(file_buf);
+
+ ReaderProperties reader_props;
+ reader_props.enable_buffered_stream();
+ reader_props.set_buffer_size(64);
+ std::unique_ptr<ParquetFileReader> file_reader =
+ ParquetFileReader::Open(in_file, reader_props);
+
+ auto row_group = file_reader->RowGroup(0);
+ auto col_reader = std::static_pointer_cast<DoubleReader>(
+ row_group->ColumnWithExposeEncoding(0, ExposedEncoding::DICTIONARY));
+ EXPECT_NE(col_reader->GetExposedEncoding(), ExposedEncoding::DICTIONARY);
+}
+
TEST(TestFileReader, BufferedReads) {
// PARQUET-1636: Buffered reads were broken before introduction of
// RandomAccessFile::GetStream