This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 65f7787522 GH-47955: [C++][Parquet] Support reading INT-encoded 
Decimal stats as Arrow scalar (#48001)
65f7787522 is described below

commit 65f77875223f3fa5948c471ff462df2c9bb23702
Author: Antoine Pitrou <[email protected]>
AuthorDate: Fri Oct 31 09:11:47 2025 +0100

    GH-47955: [C++][Parquet] Support reading INT-encoded Decimal stats as Arrow 
scalar (#48001)
    
    ### Rationale for this change
    
    The `StatisticsAsScalars` function, which allows converting Parquet 
statistics (min/max values) for a given logical type into Arrow scalars, did 
not support DECIMAL columns with physical type INT32 or INT64.
    
    ### Are these changes tested?
    
    Yes, by expanded unit test.
    
    ### Are there any user-facing changes?
    
    No, just a bug fix.
    
    * GitHub Issue: #47955
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc |  59 ++++++-----
 cpp/src/parquet/arrow/reader_internal.cc          | 117 +++++++++++++---------
 2 files changed, 105 insertions(+), 71 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc 
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index e081b428e2..cd69b2f946 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -400,16 +400,23 @@ using ParquetDataType = 
PhysicalType<test_traits<T>::parquet_enum>;
 template <typename T>
 using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
 
+Result<std::shared_ptr<Buffer>> WriteTableToBuffer(
+    const std::shared_ptr<Table>& table, int64_t row_group_size,
+    const std::shared_ptr<WriterProperties>& properties = 
default_writer_properties(),
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+        default_arrow_writer_properties()) {
+  auto sink = CreateOutputStream();
+  ARROW_RETURN_NOT_OK(WriteTable(*table, ::arrow::default_memory_pool(), sink,
+                                 row_group_size, properties, 
arrow_properties));
+  return sink->Finish();
+}
+
 void WriteTableToBuffer(const std::shared_ptr<Table>& table, int64_t 
row_group_size,
                         const std::shared_ptr<ArrowWriterProperties>& 
arrow_properties,
                         std::shared_ptr<Buffer>* out) {
-  auto sink = CreateOutputStream();
-
   auto write_props = 
WriterProperties::Builder().write_batch_size(100)->build();
-
-  ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
-                                row_group_size, write_props, 
arrow_properties));
-  ASSERT_OK_AND_ASSIGN(*out, sink->Finish());
+  ASSERT_OK_AND_ASSIGN(
+      *out, WriteTableToBuffer(table, row_group_size, write_props, 
arrow_properties));
 }
 
 void DoRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size,
@@ -3101,27 +3108,33 @@ TEST(ArrowReadWrite, DecimalStats) {
   using ::arrow::Decimal128;
   using ::arrow::field;
 
-  auto type = ::arrow::decimal128(/*precision=*/8, /*scale=*/0);
-
-  const char* json = R"(["255", "128", null, "0", "1", "-127", "-128", "-129", 
"-255"])";
-  auto array = ::arrow::ArrayFromJSON(type, json);
-  auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), 
{array});
+  // Try various precisions to trigger encoding as different physical types:
+  // - precision 8 should use INT32
+  // - precision 18 should use INT64
+  // - precision 35 should use FIXED_LEN_BYTE_ARRAY
+  for (const int precision : {8, 18, 35}) {
+    auto type = ::arrow::decimal128(precision, /*scale=*/0);
 
-  std::shared_ptr<Buffer> buffer;
-  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, /*row_group_size=*/100,
-                                             
default_arrow_writer_properties(), &buffer));
+    const char* json =
+        R"(["255", "128", null, "0", "1", "-127", "-128", "-129", "-255"])";
+    auto array = ::arrow::ArrayFromJSON(type, json);
+    auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}), 
{array});
 
-  ASSERT_OK_AND_ASSIGN(auto reader, 
OpenFile(std::make_shared<BufferReader>(buffer),
-                                             ::arrow::default_memory_pool()));
+    auto props = 
WriterProperties::Builder().enable_store_decimal_as_integer()->build();
+    ASSERT_OK_AND_ASSIGN(auto buffer,
+                         WriteTableToBuffer(table, /*row_group_size=*/100, 
props));
+    ASSERT_OK_AND_ASSIGN(auto reader, 
OpenFile(std::make_shared<BufferReader>(buffer),
+                                               
::arrow::default_memory_pool()));
 
-  std::shared_ptr<Scalar> min, max;
-  ReadSingleColumnFileStatistics(std::move(reader), &min, &max);
+    std::shared_ptr<Scalar> min, max;
+    ReadSingleColumnFileStatistics(std::move(reader), &min, &max);
 
-  std::shared_ptr<Scalar> expected_min, expected_max;
-  ASSERT_OK_AND_ASSIGN(expected_min, array->GetScalar(array->length() - 1));
-  ASSERT_OK_AND_ASSIGN(expected_max, array->GetScalar(0));
-  ::arrow::AssertScalarsEqual(*expected_min, *min, /*verbose=*/true);
-  ::arrow::AssertScalarsEqual(*expected_max, *max, /*verbose=*/true);
+    std::shared_ptr<Scalar> expected_min, expected_max;
+    ASSERT_OK_AND_ASSIGN(expected_min, array->GetScalar(array->length() - 1));
+    ASSERT_OK_AND_ASSIGN(expected_max, array->GetScalar(0));
+    ::arrow::AssertScalarsEqual(*expected_min, *min, /*verbose=*/true);
+    ::arrow::AssertScalarsEqual(*expected_max, *max, /*verbose=*/true);
+  }
 }
 
 TEST(ArrowReadWrite, NestedNullableField) {
diff --git a/cpp/src/parquet/arrow/reader_internal.cc 
b/cpp/src/parquet/arrow/reader_internal.cc
index b622e93e07..12f36fe39c 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -86,6 +86,7 @@ using arrow::Table;
 using arrow::TimestampArray;
 
 using ::arrow::bit_util::FromBigEndian;
+using ::arrow::bit_util::ToBigEndian;
 using ::arrow::internal::checked_cast;
 using ::arrow::internal::checked_pointer_cast;
 using ::arrow::internal::SafeLeftShift;
@@ -108,6 +109,62 @@ namespace {
 template <typename ArrowType>
 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
 
+template <typename DecimalType>
+Result<std::shared_ptr<::arrow::Scalar>> DecimalScalarFromBigEndianBytes(
+    std::string_view data, std::shared_ptr<DataType> arrow_type) {
+  ARROW_ASSIGN_OR_RAISE(
+      DecimalType decimal,
+      DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()),
+                                 static_cast<int32_t>(data.size())));
+  return ::arrow::MakeScalar(std::move(arrow_type), decimal);
+}
+
+// Extract Min and Max scalars from big-endian representation of Decimals.
+Status ExtractDecimalMinMaxFromBytes(std::string_view min_bytes,
+                                     std::string_view max_bytes,
+                                     const LogicalType& logical_type,
+                                     std::shared_ptr<::arrow::Scalar>* min,
+                                     std::shared_ptr<::arrow::Scalar>* max) {
+  const DecimalLogicalType& decimal_type =
+      checked_cast<const DecimalLogicalType&>(logical_type);
+
+  Result<std::shared_ptr<DataType>> maybe_type =
+      Decimal128Type::Make(decimal_type.precision(), decimal_type.scale());
+  std::shared_ptr<DataType> arrow_type;
+  if (maybe_type.ok()) {
+    arrow_type = maybe_type.ValueOrDie();
+    ARROW_ASSIGN_OR_RAISE(
+        *min, DecimalScalarFromBigEndianBytes<Decimal128>(min_bytes, 
arrow_type));
+    ARROW_ASSIGN_OR_RAISE(*max, DecimalScalarFromBigEndianBytes<Decimal128>(
+                                    max_bytes, std::move(arrow_type)));
+    return Status::OK();
+  }
+  // Fallback to see if Decimal256 can represent the type.
+  ARROW_ASSIGN_OR_RAISE(
+      arrow_type, Decimal256Type::Make(decimal_type.precision(), 
decimal_type.scale()));
+  ARROW_ASSIGN_OR_RAISE(
+      *min, DecimalScalarFromBigEndianBytes<Decimal256>(min_bytes, 
arrow_type));
+  ARROW_ASSIGN_OR_RAISE(*max, DecimalScalarFromBigEndianBytes<Decimal256>(
+                                  max_bytes, std::move(arrow_type)));
+
+  return Status::OK();
+}
+
+template <typename Int>
+Status ExtractDecimalMinMaxFromInteger(Int min_value, Int max_value,
+                                       const LogicalType& logical_type,
+                                       std::shared_ptr<::arrow::Scalar>* min,
+                                       std::shared_ptr<::arrow::Scalar>* max) {
+  static_assert(std::is_integral_v<Int>);
+  const Int min_be = ToBigEndian(min_value);
+  const Int max_be = ToBigEndian(max_value);
+  const auto min_bytes =
+      std::string_view(reinterpret_cast<const char*>(&min_be), sizeof(min_be));
+  const auto max_bytes =
+      std::string_view(reinterpret_cast<const char*>(&max_be), sizeof(max_be));
+  return ExtractDecimalMinMaxFromBytes(min_bytes, max_bytes, logical_type, 
min, max);
+}
+
 template <typename CType, typename StatisticsType>
 Status MakeMinMaxScalar(const StatisticsType& statistics,
                         std::shared_ptr<::arrow::Scalar>* min,
@@ -165,17 +222,19 @@ static Status FromInt32Statistics(const Int32Statistics& 
statistics,
   switch (logical_type.type()) {
     case LogicalType::Type::INT:
       return MakeMinMaxIntegralScalar(statistics, *type, min, max);
-      break;
     case LogicalType::Type::DATE:
     case LogicalType::Type::TIME:
     case LogicalType::Type::NONE:
       return MakeMinMaxTypedScalar<int32_t>(statistics, type, min, max);
-      break;
+    case LogicalType::Type::DECIMAL:
+      return ExtractDecimalMinMaxFromInteger(statistics.min(), 
statistics.max(),
+                                             logical_type, min, max);
     default:
       break;
   }
 
-  return Status::NotImplemented("Cannot extract statistics for type ");
+  return Status::NotImplemented("Cannot extract statistics for INT32 with 
logical type ",
+                                logical_type.ToString());
 }
 
 static Status FromInt64Statistics(const Int64Statistics& statistics,
@@ -188,58 +247,19 @@ static Status FromInt64Statistics(const Int64Statistics& 
statistics,
   switch (logical_type.type()) {
     case LogicalType::Type::INT:
       return MakeMinMaxIntegralScalar(statistics, *type, min, max);
-      break;
     case LogicalType::Type::TIME:
     case LogicalType::Type::TIMESTAMP:
     case LogicalType::Type::NONE:
       return MakeMinMaxTypedScalar<int64_t>(statistics, type, min, max);
-      break;
+    case LogicalType::Type::DECIMAL:
+      return ExtractDecimalMinMaxFromInteger(statistics.min(), 
statistics.max(),
+                                             logical_type, min, max);
     default:
       break;
   }
 
-  return Status::NotImplemented("Cannot extract statistics for type ");
-}
-
-template <typename DecimalType>
-Result<std::shared_ptr<::arrow::Scalar>> FromBigEndianString(
-    const std::string& data, std::shared_ptr<DataType> arrow_type) {
-  ARROW_ASSIGN_OR_RAISE(
-      DecimalType decimal,
-      DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()),
-                                 static_cast<int32_t>(data.size())));
-  return ::arrow::MakeScalar(std::move(arrow_type), decimal);
-}
-
-// Extracts Min and Max scalar from bytes like types (i.e. types where
-// decimal is encoded as little endian.
-Status ExtractDecimalMinMaxFromBytesType(const Statistics& statistics,
-                                         const LogicalType& logical_type,
-                                         std::shared_ptr<::arrow::Scalar>* min,
-                                         std::shared_ptr<::arrow::Scalar>* 
max) {
-  const DecimalLogicalType& decimal_type =
-      checked_cast<const DecimalLogicalType&>(logical_type);
-
-  Result<std::shared_ptr<DataType>> maybe_type =
-      Decimal128Type::Make(decimal_type.precision(), decimal_type.scale());
-  std::shared_ptr<DataType> arrow_type;
-  if (maybe_type.ok()) {
-    arrow_type = maybe_type.ValueOrDie();
-    ARROW_ASSIGN_OR_RAISE(
-        *min, FromBigEndianString<Decimal128>(statistics.EncodeMin(), 
arrow_type));
-    ARROW_ASSIGN_OR_RAISE(*max, 
FromBigEndianString<Decimal128>(statistics.EncodeMax(),
-                                                                
std::move(arrow_type)));
-    return Status::OK();
-  }
-  // Fallback to see if Decimal256 can represent the type.
-  ARROW_ASSIGN_OR_RAISE(
-      arrow_type, Decimal256Type::Make(decimal_type.precision(), 
decimal_type.scale()));
-  ARROW_ASSIGN_OR_RAISE(
-      *min, FromBigEndianString<Decimal256>(statistics.EncodeMin(), 
arrow_type));
-  ARROW_ASSIGN_OR_RAISE(*max, 
FromBigEndianString<Decimal256>(statistics.EncodeMax(),
-                                                              
std::move(arrow_type)));
-
-  return Status::OK();
+  return Status::NotImplemented("Cannot extract statistics for INT64 with 
logical type ",
+                                logical_type.ToString());
 }
 
 Status ByteArrayStatisticsAsScalars(const Statistics& statistics,
@@ -247,7 +267,8 @@ Status ByteArrayStatisticsAsScalars(const Statistics& 
statistics,
                                     std::shared_ptr<::arrow::Scalar>* max) {
   auto logical_type = statistics.descr()->logical_type();
   if (logical_type->type() == LogicalType::Type::DECIMAL) {
-    return ExtractDecimalMinMaxFromBytesType(statistics, *logical_type, min, 
max);
+    return ExtractDecimalMinMaxFromBytes(statistics.EncodeMin(), 
statistics.EncodeMax(),
+                                         *logical_type, min, max);
   }
   std::shared_ptr<::arrow::DataType> type;
   if (statistics.descr()->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {

Reply via email to