This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new d3a576d03a GH-37002: [C++][Parquet] Add api to get RecordReader from
RowGroupReader (#37003)
d3a576d03a is described below
commit d3a576d03ad943773d9c55974f9c797a5042b4a8
Author: Fatemah Panahi <[email protected]>
AuthorDate: Mon Oct 16 18:16:52 2023 -0700
GH-37002: [C++][Parquet] Add api to get RecordReader from RowGroupReader
(#37003)
Currently we only can get a ColumnReader for a column from RowGroupReader.
We need an API to return a RecordReader for the column.
Moved ComputeLevelInfo from column_writer to level_conversion so that it
can be shared between column_writer and file_reader.
* Closes: #37002
Lead-authored-by: Fatemah Panahi <[email protected]>
Co-authored-by: Fatemah Panahi <[email protected]>
Signed-off-by: Gang Wu <[email protected]>
---
cpp/src/parquet/column_writer.cc | 19 +------------------
cpp/src/parquet/file_reader.cc | 20 ++++++++++++++++++++
cpp/src/parquet/file_reader.h | 8 ++++++++
cpp/src/parquet/level_conversion.h | 18 ++++++++++++++++++
cpp/src/parquet/properties.h | 6 ++++++
cpp/src/parquet/reader_test.cc | 36 ++++++++++++++++++++++++++++++++++++
6 files changed, 89 insertions(+), 18 deletions(-)
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 88829ef5d9..72e984d773 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -140,23 +140,6 @@ struct ValueBufferSlicer {
MemoryPool* pool_;
};
-internal::LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) {
- internal::LevelInfo level_info;
- level_info.def_level = descr->max_definition_level();
- level_info.rep_level = descr->max_repetition_level();
-
- int16_t min_spaced_def_level = descr->max_definition_level();
- const ::parquet::schema::Node* node = descr->schema_node().get();
- while (node != nullptr && !node->is_repeated()) {
- if (node->is_optional()) {
- min_spaced_def_level--;
- }
- node = node->parent();
- }
- level_info.repeated_ancestor_def_level = min_spaced_def_level;
- return level_info;
-}
-
template <class T>
inline const T* AddIfNotNull(const T* base, int64_t offset) {
if (base != nullptr) {
@@ -738,7 +721,7 @@ class ColumnWriterImpl {
Encoding::type encoding, const WriterProperties* properties)
: metadata_(metadata),
descr_(metadata->descr()),
- level_info_(ComputeLevelInfo(metadata->descr())),
+ level_info_(internal::LevelInfo::ComputeLevelInfo(metadata->descr())),
pager_(std::move(pager)),
has_dictionary_(use_dictionary),
encoding_(encoding),
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 7318b5640c..1d972b78fb 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -82,6 +82,26 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
}
+std::shared_ptr<internal::RecordReader> RowGroupReader::RecordReader(int i) {
+ if (i >= metadata()->num_columns()) {
+ std::stringstream ss;
+ ss << "Trying to read column index " << i << " but row group metadata has
only "
+ << metadata()->num_columns() << " columns";
+ throw ParquetException(ss.str());
+ }
+ const ColumnDescriptor* descr = metadata()->schema()->Column(i);
+
+ std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
+
+ internal::LevelInfo level_info =
internal::LevelInfo::ComputeLevelInfo(descr);
+
+ auto reader = internal::RecordReader::Make(
+ descr, level_info, contents_->properties()->memory_pool(),
+ /* read_dictionary = */ false,
contents_->properties()->read_dense_for_nullable());
+ reader->SetPageReader(std::move(page_reader));
+ return reader;
+}
+
std::shared_ptr<ColumnReader> RowGroupReader::ColumnWithExposeEncoding(
int i, ExposedEncoding encoding_to_expose) {
std::shared_ptr<ColumnReader> reader = Column(i);
diff --git a/cpp/src/parquet/file_reader.h b/cpp/src/parquet/file_reader.h
index 13fcd196c4..da85b73fc2 100644
--- a/cpp/src/parquet/file_reader.h
+++ b/cpp/src/parquet/file_reader.h
@@ -37,6 +37,10 @@ class BloomFilterReader;
class PageReader;
class RowGroupMetaData;
+namespace internal {
+class RecordReader;
+}
+
class PARQUET_EXPORT RowGroupReader {
public:
// Forward declare a virtual class 'Contents' to aid dependency injection
and more
@@ -58,6 +62,10 @@ class PARQUET_EXPORT RowGroupReader {
// column. Ownership is shared with the RowGroupReader.
std::shared_ptr<ColumnReader> Column(int i);
+ // EXPERIMENTAL: Construct a RecordReader for the indicated column of the
row group.
+ // Ownership is shared with the RowGroupReader.
+ std::shared_ptr<internal::RecordReader> RecordReader(int i);
+
// Construct a ColumnReader, trying to enable exposed encoding.
//
// For dictionary encoding, currently we only support column chunks that are
fully
diff --git a/cpp/src/parquet/level_conversion.h
b/cpp/src/parquet/level_conversion.h
index 3f56b2de36..2c6f628319 100644
--- a/cpp/src/parquet/level_conversion.h
+++ b/cpp/src/parquet/level_conversion.h
@@ -121,6 +121,24 @@ struct PARQUET_EXPORT LevelInfo {
return last_repeated_ancestor;
}
+ // Calculates and returns LevelInfo for a column descriptor.
+ static LevelInfo ComputeLevelInfo(const ColumnDescriptor* descr) {
+ LevelInfo level_info;
+ level_info.def_level = descr->max_definition_level();
+ level_info.rep_level = descr->max_repetition_level();
+
+ int16_t min_spaced_def_level = descr->max_definition_level();
+ const ::parquet::schema::Node* node = descr->schema_node().get();
+ while (node && !node->is_repeated()) {
+ if (node->is_optional()) {
+ min_spaced_def_level--;
+ }
+ node = node->parent();
+ }
+ level_info.repeated_ancestor_def_level = min_spaced_def_level;
+ return level_info;
+ }
+
friend std::ostream& operator<<(std::ostream& os, const LevelInfo& levels) {
// This print method is to silence valgrind issues. What's printed
// is not important because all asserts happen directly on
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 6d3d5aa4f4..4d3acb491e 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -79,6 +79,10 @@ class PARQUET_EXPORT ReaderProperties {
/// Disable buffered stream reading.
void disable_buffered_stream() { buffered_stream_enabled_ = false; }
+ bool read_dense_for_nullable() const { return read_dense_for_nullable_; }
+ void enable_read_dense_for_nullable() { read_dense_for_nullable_ = true; }
+ void disable_read_dense_for_nullable() { read_dense_for_nullable_ = false; }
+
/// Return the size of the buffered stream buffer.
int64_t buffer_size() const { return buffer_size_; }
/// Set the size of the buffered stream buffer in bytes.
@@ -123,6 +127,8 @@ class PARQUET_EXPORT ReaderProperties {
int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit;
bool buffered_stream_enabled_ = false;
bool page_checksum_verification_ = false;
+ // Used with a RecordReader.
+ bool read_dense_for_nullable_ = false;
std::shared_ptr<FileDecryptionProperties> file_decryption_properties_;
};
diff --git a/cpp/src/parquet/reader_test.cc b/cpp/src/parquet/reader_test.cc
index 0a73002846..8fe12d3de0 100644
--- a/cpp/src/parquet/reader_test.cc
+++ b/cpp/src/parquet/reader_test.cc
@@ -502,6 +502,42 @@ TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) {
ASSERT_THROW(printer2.DebugPrint(ss, columns), ParquetException);
}
+// Tests that read_dense_for_nullable is passed down to the record
+// reader. The functionality of read_dense_for_nullable is tested
+// elsewhere.
+TEST(TestFileReader, RecordReaderReadDenseForNullable) {
+ // We test the default which is false, and also test enabling and disabling
+ // read_dense_for_nullable.
+ std::vector<ReaderProperties> reader_properties(3);
+ reader_properties[1].enable_read_dense_for_nullable();
+ reader_properties[2].disable_read_dense_for_nullable();
+ for (const auto& reader_props : reader_properties) {
+ std::unique_ptr<ParquetFileReader> file_reader =
ParquetFileReader::OpenFile(
+ alltypes_plain(), /* memory_map = */ false, reader_props);
+ std::shared_ptr<RowGroupReader> group = file_reader->RowGroup(0);
+ std::shared_ptr<internal::RecordReader> col_record_reader =
group->RecordReader(0);
+ ASSERT_EQ(reader_props.read_dense_for_nullable(),
+ col_record_reader->read_dense_for_nullable());
+ }
+}
+
+// Tests getting a record reader from a row group reader.
+TEST(TestFileReader, GetRecordReader) {
+ ReaderProperties reader_props;
+ std::unique_ptr<ParquetFileReader> file_reader = ParquetFileReader::OpenFile(
+ alltypes_plain(), /* memory_map = */ false, reader_props);
+ std::shared_ptr<RowGroupReader> group = file_reader->RowGroup(0);
+
+ std::shared_ptr<internal::RecordReader> col_record_reader_ =
group->RecordReader(0);
+
+ ASSERT_TRUE(col_record_reader_->HasMoreData());
+ auto records_read = col_record_reader_->ReadRecords(4);
+ ASSERT_EQ(records_read, 4);
+ ASSERT_EQ(4, col_record_reader_->values_written());
+ ASSERT_EQ(4, col_record_reader_->levels_position());
+ ASSERT_EQ(8, col_record_reader_->levels_written());
+}
+
class TestLocalFile : public ::testing::Test {
public:
void SetUp() {