This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e6e8f3fc5 GH-45339: [Parquet][C++] Fix statistics load logic for no 
row group and multiple row groups (#45350)
3e6e8f3fc5 is described below

commit 3e6e8f3fc59777957334c90e02d7826fb27e2488
Author: Sutou Kouhei <[email protected]>
AuthorDate: Thu Jan 30 13:27:08 2025 +0900

    GH-45339: [Parquet][C++] Fix statistics load logic for no row group and 
multiple row groups (#45350)
    
    ### Rationale for this change
    
    Loading `arrow::ArrayStatistics` logic depends on 
`parquet::ColumnChunkMetaData`.
    
    We can't get `parquet::ColumnChunkMetaData` when requested row groups are 
empty because no associated row group and column chunk exist.
    
    We can't use multiple `parquet::ColumnChunkMetaData`s for now because we 
don't have statistics merge logic. So we can't load statistics when we use 
multiple row groups.
    
    ### What changes are included in this PR?
    
    * Don't load statistics when no row groups are used
    * Don't load statistics when multiple row groups are used
    * Add `parquet::ArrowReaderProperties::{set_,}should_load_statistics()` to 
enforce loading statistics by loading row group one by one
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    Yes.
    * GitHub Issue: #45339
    
    Authored-by: Sutou Kouhei <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 102 ++++++++++++++++++++++
 cpp/src/parquet/arrow/arrow_statistics_test.cc    |  55 ++++++++++--
 cpp/src/parquet/arrow/reader.cc                   |  20 ++++-
 cpp/src/parquet/arrow/reader_internal.cc          |   4 +
 cpp/src/parquet/arrow/reader_internal.h           |   1 +
 cpp/src/parquet/arrow/test_util.h                 |   4 +-
 cpp/src/parquet/properties.h                      |  13 ++-
 7 files changed, 187 insertions(+), 12 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc 
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index 47a00016b9..27cb849365 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -4296,6 +4296,108 @@ TEST(TestArrowReaderAdHoc, ReadFloat16Files) {
   }
 }
 
+TEST(TestArrowFileReader, RecordBatchReaderEmptyRowGroups) {
+  const int num_columns = 1;
+  const int num_rows = 3;
+  const int num_chunks = 1;
+
+  std::shared_ptr<Table> table;
+  ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, 
&table));
+
+  const int64_t row_group_size = num_rows;
+  std::shared_ptr<Buffer> buffer;
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
+                                             
default_arrow_writer_properties(), &buffer));
+
+  auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  std::unique_ptr<FileReader> file_reader;
+  ASSERT_OK(
+      FileReader::Make(::arrow::default_memory_pool(), std::move(reader), 
&file_reader));
+  // This is the important part in this test.
+  std::vector<int> row_group_indices = {};
+  ASSERT_OK_AND_ASSIGN(auto record_batch_reader,
+                       file_reader->GetRecordBatchReader(row_group_indices));
+  std::shared_ptr<::arrow::RecordBatch> record_batch;
+  ASSERT_OK(record_batch_reader->ReadNext(&record_batch));
+  // No read record batch for empty row groups request.
+  ASSERT_FALSE(record_batch);
+}
+
+TEST(TestArrowFileReader, RecordBatchReaderEmptyInput) {
+  const int num_columns = 1;
+  // This is the important part in this test.
+  const int num_rows = 0;
+  const int num_chunks = 1;
+
+  std::shared_ptr<Table> table;
+  ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, 
&table));
+
+  const int64_t row_group_size = num_rows;
+  std::shared_ptr<Buffer> buffer;
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
+                                             
default_arrow_writer_properties(), &buffer));
+
+  auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  std::unique_ptr<FileReader> file_reader;
+  ASSERT_OK(
+      FileReader::Make(::arrow::default_memory_pool(), std::move(reader), 
&file_reader));
+  ASSERT_OK_AND_ASSIGN(auto record_batch_reader, 
file_reader->GetRecordBatchReader());
+  std::shared_ptr<::arrow::RecordBatch> record_batch;
+  ASSERT_OK(record_batch_reader->ReadNext(&record_batch));
+  // No read record batch for empty data.
+  ASSERT_FALSE(record_batch);
+}
+
+TEST(TestArrowColumnReader, NextBatchZeroBatchSize) {
+  const int num_columns = 1;
+  const int num_rows = 3;
+  const int num_chunks = 1;
+
+  std::shared_ptr<Table> table;
+  ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, 
&table));
+
+  const int64_t row_group_size = num_rows;
+  std::shared_ptr<Buffer> buffer;
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
+                                             
default_arrow_writer_properties(), &buffer));
+
+  auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  std::unique_ptr<FileReader> file_reader;
+  ASSERT_OK(
+      FileReader::Make(::arrow::default_memory_pool(), std::move(reader), 
&file_reader));
+  std::unique_ptr<arrow::ColumnReader> column_reader;
+  ASSERT_OK(file_reader->GetColumn(0, &column_reader));
+  std::shared_ptr<ChunkedArray> chunked_array;
+  // This is the important part in this test.
+  ASSERT_OK(column_reader->NextBatch(0, &chunked_array));
+  ASSERT_EQ(0, chunked_array->length());
+}
+
+TEST(TestArrowColumnReader, NextBatchEmptyInput) {
+  const int num_columns = 1;
+  // This is the important part in this test.
+  const int num_rows = 0;
+  const int num_chunks = 1;
+
+  std::shared_ptr<Table> table;
+  ASSERT_NO_FATAL_FAILURE(MakeDoubleTable(num_columns, num_rows, num_chunks, 
&table));
+
+  const int64_t row_group_size = num_rows;
+  std::shared_ptr<Buffer> buffer;
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, row_group_size,
+                                             
default_arrow_writer_properties(), &buffer));
+
+  auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer));
+  std::unique_ptr<FileReader> file_reader;
+  ASSERT_OK(
+      FileReader::Make(::arrow::default_memory_pool(), std::move(reader), 
&file_reader));
+  std::unique_ptr<arrow::ColumnReader> column_reader;
+  ASSERT_OK(file_reader->GetColumn(0, &column_reader));
+  std::shared_ptr<ChunkedArray> chunked_array;
+  ASSERT_OK(column_reader->NextBatch(10, &chunked_array));
+  ASSERT_EQ(0, chunked_array->length());
+}
+
 // direct-as-possible translation of
 // pyarrow/tests/test_parquet.py::test_validate_schema_write_table
 TEST(TestArrowWriterAdHoc, SchemaMismatch) {
diff --git a/cpp/src/parquet/arrow/arrow_statistics_test.cc 
b/cpp/src/parquet/arrow/arrow_statistics_test.cc
index a8e2287d37..048518644c 100644
--- a/cpp/src/parquet/arrow/arrow_statistics_test.cc
+++ b/cpp/src/parquet/arrow/arrow_statistics_test.cc
@@ -185,16 +185,17 @@ TEST(StatisticsTest, TruncateOnlyHalfMinMax) {
 
 namespace {
 ::arrow::Result<std::shared_ptr<::arrow::Array>> StatisticsReadArray(
-    std::shared_ptr<::arrow::DataType> data_type, 
std::shared_ptr<::arrow::Array> array) {
+    std::shared_ptr<::arrow::DataType> data_type, 
std::shared_ptr<::arrow::Array> array,
+    std::shared_ptr<WriterProperties> writer_properties = 
default_writer_properties(),
+    const ArrowReaderProperties& reader_properties = 
default_arrow_reader_properties()) {
   auto schema = ::arrow::schema({::arrow::field("column", data_type)});
   auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), 
{array});
   ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create());
   const auto arrow_writer_properties =
       parquet::ArrowWriterProperties::Builder().store_schema()->build();
-  ARROW_ASSIGN_OR_RAISE(
-      auto writer,
-      FileWriter::Open(*schema, ::arrow::default_memory_pool(), sink,
-                       default_writer_properties(), arrow_writer_properties));
+  ARROW_ASSIGN_OR_RAISE(auto writer,
+                        FileWriter::Open(*schema, 
::arrow::default_memory_pool(), sink,
+                                         writer_properties, 
arrow_writer_properties));
   ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
   ARROW_RETURN_NOT_OK(writer->Close());
   ARROW_ASSIGN_OR_RAISE(auto buffer, sink->Finish());
@@ -202,8 +203,8 @@ namespace {
   auto reader =
       
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
   std::unique_ptr<FileReader> file_reader;
-  ARROW_RETURN_NOT_OK(
-      FileReader::Make(::arrow::default_memory_pool(), std::move(reader), 
&file_reader));
+  ARROW_RETURN_NOT_OK(FileReader::Make(::arrow::default_memory_pool(), 
std::move(reader),
+                                       reader_properties, &file_reader));
   std::shared_ptr<::arrow::ChunkedArray> chunked_array;
   ARROW_RETURN_NOT_OK(file_reader->ReadColumn(0, &chunked_array));
   return chunked_array->chunk(0);
@@ -326,4 +327,44 @@ TEST(TestStatisticsRead, Duration) {
       ::arrow::duration(::arrow::TimeUnit::NANO));
 }
 
+TEST(TestStatisticsRead, MultipleRowGroupsDefault) {
+  auto arrow_type = ::arrow::int32();
+  auto built_array = ArrayFromJSON(arrow_type, R"([1, null, -1])");
+  auto writer_properties = 
WriterProperties::Builder().max_row_group_length(2)->build();
+  ASSERT_OK_AND_ASSIGN(
+      auto read_array,
+      StatisticsReadArray(arrow_type, std::move(built_array), 
writer_properties));
+  auto typed_read_array = 
std::static_pointer_cast<::arrow::Int32Array>(read_array);
+  auto statistics = typed_read_array->statistics();
+  ASSERT_EQ(nullptr, statistics);
+}
+
+TEST(TestStatisticsRead, MultipleRowGroupsShouldLoadStatistics) {
+  auto arrow_type = ::arrow::int32();
+  auto built_array = ArrayFromJSON(arrow_type, R"([1, null, -1])");
+  auto writer_properties = 
WriterProperties::Builder().max_row_group_length(2)->build();
+  ArrowReaderProperties reader_properties;
+  reader_properties.set_should_load_statistics(true);
+  ASSERT_OK_AND_ASSIGN(auto read_array,
+                       StatisticsReadArray(arrow_type, std::move(built_array),
+                                           writer_properties, 
reader_properties));
+  // If we use should_load_statistics, reader doesn't load multiple
+  // row groups at once. So the first array in the read chunked array
+  // has only 2 elements.
+  ASSERT_EQ(2, read_array->length());
+  auto typed_read_array = 
std::static_pointer_cast<::arrow::Int32Array>(read_array);
+  auto statistics = typed_read_array->statistics();
+  ASSERT_NE(nullptr, statistics);
+  ASSERT_EQ(true, statistics->null_count.has_value());
+  ASSERT_EQ(1, statistics->null_count.value());
+  ASSERT_EQ(false, statistics->distinct_count.has_value());
+  ASSERT_EQ(true, statistics->min.has_value());
+  // This is not -1 because this array has only the first 2 elements.
+  ASSERT_EQ(1, std::get<int64_t>(*statistics->min));
+  ASSERT_EQ(true, statistics->is_min_exact);
+  ASSERT_EQ(true, statistics->max.has_value());
+  ASSERT_EQ(1, std::get<int64_t>(*statistics->max));
+  ASSERT_EQ(true, statistics->is_max_exact);
+}
+
 }  // namespace parquet::arrow
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 465ad5844d..03b725beb2 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -218,6 +218,7 @@ class FileReaderImpl : public FileReader {
     ctx->iterator_factory = SomeRowGroupsFactory(row_groups);
     ctx->filter_leaves = true;
     ctx->included_leaves = included_leaves;
+    ctx->reader_properties = &reader_properties_;
     return GetReader(manifest_.schema_fields[i], ctx, out);
   }
 
@@ -475,6 +476,8 @@ class LeafReader : public ColumnReaderImpl {
     record_reader_->Reset();
     // Pre-allocation gives much better performance for flat columns
     record_reader_->Reserve(records_to_read);
+    const bool should_load_statistics = 
ctx_->reader_properties->should_load_statistics();
+    int64_t num_target_row_groups = 0;
     while (records_to_read > 0) {
       if (!record_reader_->HasMoreData()) {
         break;
@@ -483,11 +486,21 @@ class LeafReader : public ColumnReaderImpl {
       records_to_read -= records_read;
       if (records_read == 0) {
         NextRowGroup();
+      } else {
+        num_target_row_groups++;
+        // We can't mix multiple row groups when we load statistics
+        // because statistics are associated with a row group. If we
+        // want to mix multiple row groups and keep valid statistics,
+        // we need to implement a statistics merge logic.
+        if (should_load_statistics) {
+          break;
+        }
       }
     }
-    RETURN_NOT_OK(TransferColumnData(record_reader_.get(),
-                                     input_->column_chunk_metadata(), field_, 
descr_,
-                                     ctx_.get(), &out_));
+    RETURN_NOT_OK(TransferColumnData(
+        record_reader_.get(),
+        num_target_row_groups == 1 ? input_->column_chunk_metadata() : 
nullptr, field_,
+        descr_, ctx_.get(), &out_));
     return Status::OK();
     END_PARQUET_CATCH_EXCEPTIONS
   }
@@ -1214,6 +1227,7 @@ Status FileReaderImpl::GetColumn(int i, 
FileColumnIteratorFactory iterator_facto
   ctx->pool = pool_;
   ctx->iterator_factory = iterator_factory;
   ctx->filter_leaves = false;
+  ctx->reader_properties = &reader_properties_;
   std::unique_ptr<ColumnReaderImpl> result;
   RETURN_NOT_OK(GetReader(manifest_.schema_fields[i], ctx, &result));
   *out = std::move(result);
diff --git a/cpp/src/parquet/arrow/reader_internal.cc 
b/cpp/src/parquet/arrow/reader_internal.cc
index 9d3171ea1a..59fe2b4600 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -322,6 +322,10 @@ template <typename ArrowType, typename ParquetType>
 void AttachStatistics(::arrow::ArrayData* data,
                       std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
                       const ReaderContext* ctx) {
+  if (!metadata) {
+    return;
+  }
+
   using ArrowCType = typename ArrowType::c_type;
 
   auto statistics = metadata->statistics().get();
diff --git a/cpp/src/parquet/arrow/reader_internal.h 
b/cpp/src/parquet/arrow/reader_internal.h
index fab56c8880..4ee3bf98bc 100644
--- a/cpp/src/parquet/arrow/reader_internal.h
+++ b/cpp/src/parquet/arrow/reader_internal.h
@@ -117,6 +117,7 @@ struct ReaderContext {
   FileColumnIteratorFactory iterator_factory;
   bool filter_leaves;
   std::shared_ptr<std::unordered_set<int>> included_leaves;
+  ArrowReaderProperties* reader_properties;
 
   bool IncludesLeaf(int leaf_index) const {
     if (this->filter_leaves) {
diff --git a/cpp/src/parquet/arrow/test_util.h 
b/cpp/src/parquet/arrow/test_util.h
index c8fcbbb65d..cfc57ce6ea 100644
--- a/cpp/src/parquet/arrow/test_util.h
+++ b/cpp/src/parquet/arrow/test_util.h
@@ -229,7 +229,9 @@ template <typename ArrowType>
   }
 
   ::arrow::NumericBuilder<ArrowType> builder;
-  RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), 
valid_bytes.data()));
+  if (values.size() > 0) {
+    RETURN_NOT_OK(builder.AppendValues(values.data(), values.size(), 
valid_bytes.data()));
+  }
   return builder.Finish(out);
 }
 
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 8ae3660014..19436b84a3 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -913,7 +913,8 @@ class PARQUET_EXPORT ArrowReaderProperties {
         pre_buffer_(true),
         cache_options_(::arrow::io::CacheOptions::LazyDefaults()),
         coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO),
-        arrow_extensions_enabled_(false) {}
+        arrow_extensions_enabled_(false),
+        should_load_statistics_(false) {}
 
   /// \brief Set whether to use the IO thread pool to parse columns in 
parallel.
   ///
@@ -996,6 +997,15 @@ class PARQUET_EXPORT ArrowReaderProperties {
   }
   bool get_arrow_extensions_enabled() const { return 
arrow_extensions_enabled_; }
 
+  /// \brief Set whether to load statistics as much as possible.
+  ///
+  /// Default is false.
+  void set_should_load_statistics(bool should_load_statistics) {
+    should_load_statistics_ = should_load_statistics;
+  }
+  /// Return whether loading statistics as much as possible.
+  bool should_load_statistics() const { return should_load_statistics_; }
+
  private:
   bool use_threads_;
   std::unordered_set<int> read_dict_indices_;
@@ -1005,6 +1015,7 @@ class PARQUET_EXPORT ArrowReaderProperties {
   ::arrow::io::CacheOptions cache_options_;
   ::arrow::TimeUnit::type coerce_int96_timestamp_unit_;
   bool arrow_extensions_enabled_;
+  bool should_load_statistics_;
 };
 
 /// EXPERIMENTAL: Constructs the default ArrowReaderProperties

Reply via email to