This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 65f7787522 GH-47955: [C++][Parquet] Support reading INT-encoded
Decimal stats as Arrow scalar (#48001)
65f7787522 is described below
commit 65f77875223f3fa5948c471ff462df2c9bb23702
Author: Antoine Pitrou <[email protected]>
AuthorDate: Fri Oct 31 09:11:47 2025 +0100
GH-47955: [C++][Parquet] Support reading INT-encoded Decimal stats as Arrow
scalar (#48001)
### Rationale for this change
The `StatisticsAsScalars` function, which allows converting Parquet
statistics (min/max values) for a given logical type into Arrow scalars, did
not support DECIMAL columns with physical type INT32 or INT64.
### Are these changes tested?
Yes, by expanded unit test.
### Are there any user-facing changes?
No, just a bug fix.
* GitHub Issue: #47955
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 59 ++++++-----
cpp/src/parquet/arrow/reader_internal.cc | 117 +++++++++++++---------
2 files changed, 105 insertions(+), 71 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index e081b428e2..cd69b2f946 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -400,16 +400,23 @@ using ParquetDataType =
PhysicalType<test_traits<T>::parquet_enum>;
template <typename T>
using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
+Result<std::shared_ptr<Buffer>> WriteTableToBuffer(
+ const std::shared_ptr<Table>& table, int64_t row_group_size,
+ const std::shared_ptr<WriterProperties>& properties =
default_writer_properties(),
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+ default_arrow_writer_properties()) {
+ auto sink = CreateOutputStream();
+ ARROW_RETURN_NOT_OK(WriteTable(*table, ::arrow::default_memory_pool(), sink,
+ row_group_size, properties,
arrow_properties));
+ return sink->Finish();
+}
+
void WriteTableToBuffer(const std::shared_ptr<Table>& table, int64_t
row_group_size,
const std::shared_ptr<ArrowWriterProperties>&
arrow_properties,
std::shared_ptr<Buffer>* out) {
- auto sink = CreateOutputStream();
-
auto write_props =
WriterProperties::Builder().write_batch_size(100)->build();
-
- ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
- row_group_size, write_props,
arrow_properties));
- ASSERT_OK_AND_ASSIGN(*out, sink->Finish());
+ ASSERT_OK_AND_ASSIGN(
+ *out, WriteTableToBuffer(table, row_group_size, write_props,
arrow_properties));
}
void DoRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size,
@@ -3101,27 +3108,33 @@ TEST(ArrowReadWrite, DecimalStats) {
using ::arrow::Decimal128;
using ::arrow::field;
- auto type = ::arrow::decimal128(/*precision=*/8, /*scale=*/0);
-
- const char* json = R"(["255", "128", null, "0", "1", "-127", "-128", "-129",
"-255"])";
- auto array = ::arrow::ArrayFromJSON(type, json);
- auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}),
{array});
+ // Try various precisions to trigger encoding as different physical types:
+ // - precision 8 should use INT32
+ // - precision 18 should use INT64
+ // - precision 35 should use FIXED_LEN_BYTE_ARRAY
+ for (const int precision : {8, 18, 35}) {
+ auto type = ::arrow::decimal128(precision, /*scale=*/0);
- std::shared_ptr<Buffer> buffer;
- ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, /*row_group_size=*/100,
-
default_arrow_writer_properties(), &buffer));
+ const char* json =
+ R"(["255", "128", null, "0", "1", "-127", "-128", "-129", "-255"])";
+ auto array = ::arrow::ArrayFromJSON(type, json);
+ auto table = ::arrow::Table::Make(::arrow::schema({field("root", type)}),
{array});
- ASSERT_OK_AND_ASSIGN(auto reader,
OpenFile(std::make_shared<BufferReader>(buffer),
- ::arrow::default_memory_pool()));
+ auto props =
WriterProperties::Builder().enable_store_decimal_as_integer()->build();
+ ASSERT_OK_AND_ASSIGN(auto buffer,
+ WriteTableToBuffer(table, /*row_group_size=*/100,
props));
+ ASSERT_OK_AND_ASSIGN(auto reader,
OpenFile(std::make_shared<BufferReader>(buffer),
+
::arrow::default_memory_pool()));
- std::shared_ptr<Scalar> min, max;
- ReadSingleColumnFileStatistics(std::move(reader), &min, &max);
+ std::shared_ptr<Scalar> min, max;
+ ReadSingleColumnFileStatistics(std::move(reader), &min, &max);
- std::shared_ptr<Scalar> expected_min, expected_max;
- ASSERT_OK_AND_ASSIGN(expected_min, array->GetScalar(array->length() - 1));
- ASSERT_OK_AND_ASSIGN(expected_max, array->GetScalar(0));
- ::arrow::AssertScalarsEqual(*expected_min, *min, /*verbose=*/true);
- ::arrow::AssertScalarsEqual(*expected_max, *max, /*verbose=*/true);
+ std::shared_ptr<Scalar> expected_min, expected_max;
+ ASSERT_OK_AND_ASSIGN(expected_min, array->GetScalar(array->length() - 1));
+ ASSERT_OK_AND_ASSIGN(expected_max, array->GetScalar(0));
+ ::arrow::AssertScalarsEqual(*expected_min, *min, /*verbose=*/true);
+ ::arrow::AssertScalarsEqual(*expected_max, *max, /*verbose=*/true);
+ }
}
TEST(ArrowReadWrite, NestedNullableField) {
diff --git a/cpp/src/parquet/arrow/reader_internal.cc
b/cpp/src/parquet/arrow/reader_internal.cc
index b622e93e07..12f36fe39c 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -86,6 +86,7 @@ using arrow::Table;
using arrow::TimestampArray;
using ::arrow::bit_util::FromBigEndian;
+using ::arrow::bit_util::ToBigEndian;
using ::arrow::internal::checked_cast;
using ::arrow::internal::checked_pointer_cast;
using ::arrow::internal::SafeLeftShift;
@@ -108,6 +109,62 @@ namespace {
template <typename ArrowType>
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
+template <typename DecimalType>
+Result<std::shared_ptr<::arrow::Scalar>> DecimalScalarFromBigEndianBytes(
+ std::string_view data, std::shared_ptr<DataType> arrow_type) {
+ ARROW_ASSIGN_OR_RAISE(
+ DecimalType decimal,
+ DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()),
+ static_cast<int32_t>(data.size())));
+ return ::arrow::MakeScalar(std::move(arrow_type), decimal);
+}
+
+// Extract Min and Max scalars from big-endian representation of Decimals.
+Status ExtractDecimalMinMaxFromBytes(std::string_view min_bytes,
+ std::string_view max_bytes,
+ const LogicalType& logical_type,
+ std::shared_ptr<::arrow::Scalar>* min,
+ std::shared_ptr<::arrow::Scalar>* max) {
+ const DecimalLogicalType& decimal_type =
+ checked_cast<const DecimalLogicalType&>(logical_type);
+
+ Result<std::shared_ptr<DataType>> maybe_type =
+ Decimal128Type::Make(decimal_type.precision(), decimal_type.scale());
+ std::shared_ptr<DataType> arrow_type;
+ if (maybe_type.ok()) {
+ arrow_type = maybe_type.ValueOrDie();
+ ARROW_ASSIGN_OR_RAISE(
+ *min, DecimalScalarFromBigEndianBytes<Decimal128>(min_bytes,
arrow_type));
+ ARROW_ASSIGN_OR_RAISE(*max, DecimalScalarFromBigEndianBytes<Decimal128>(
+ max_bytes, std::move(arrow_type)));
+ return Status::OK();
+ }
+ // Fallback to see if Decimal256 can represent the type.
+ ARROW_ASSIGN_OR_RAISE(
+ arrow_type, Decimal256Type::Make(decimal_type.precision(),
decimal_type.scale()));
+ ARROW_ASSIGN_OR_RAISE(
+ *min, DecimalScalarFromBigEndianBytes<Decimal256>(min_bytes,
arrow_type));
+ ARROW_ASSIGN_OR_RAISE(*max, DecimalScalarFromBigEndianBytes<Decimal256>(
+ max_bytes, std::move(arrow_type)));
+
+ return Status::OK();
+}
+
+template <typename Int>
+Status ExtractDecimalMinMaxFromInteger(Int min_value, Int max_value,
+ const LogicalType& logical_type,
+ std::shared_ptr<::arrow::Scalar>* min,
+ std::shared_ptr<::arrow::Scalar>* max) {
+ static_assert(std::is_integral_v<Int>);
+ const Int min_be = ToBigEndian(min_value);
+ const Int max_be = ToBigEndian(max_value);
+ const auto min_bytes =
+ std::string_view(reinterpret_cast<const char*>(&min_be), sizeof(min_be));
+ const auto max_bytes =
+ std::string_view(reinterpret_cast<const char*>(&max_be), sizeof(max_be));
+ return ExtractDecimalMinMaxFromBytes(min_bytes, max_bytes, logical_type,
min, max);
+}
+
template <typename CType, typename StatisticsType>
Status MakeMinMaxScalar(const StatisticsType& statistics,
std::shared_ptr<::arrow::Scalar>* min,
@@ -165,17 +222,19 @@ static Status FromInt32Statistics(const Int32Statistics&
statistics,
switch (logical_type.type()) {
case LogicalType::Type::INT:
return MakeMinMaxIntegralScalar(statistics, *type, min, max);
- break;
case LogicalType::Type::DATE:
case LogicalType::Type::TIME:
case LogicalType::Type::NONE:
return MakeMinMaxTypedScalar<int32_t>(statistics, type, min, max);
- break;
+ case LogicalType::Type::DECIMAL:
+ return ExtractDecimalMinMaxFromInteger(statistics.min(),
statistics.max(),
+ logical_type, min, max);
default:
break;
}
- return Status::NotImplemented("Cannot extract statistics for type ");
+ return Status::NotImplemented("Cannot extract statistics for INT32 with
logical type ",
+ logical_type.ToString());
}
static Status FromInt64Statistics(const Int64Statistics& statistics,
@@ -188,58 +247,19 @@ static Status FromInt64Statistics(const Int64Statistics&
statistics,
switch (logical_type.type()) {
case LogicalType::Type::INT:
return MakeMinMaxIntegralScalar(statistics, *type, min, max);
- break;
case LogicalType::Type::TIME:
case LogicalType::Type::TIMESTAMP:
case LogicalType::Type::NONE:
return MakeMinMaxTypedScalar<int64_t>(statistics, type, min, max);
- break;
+ case LogicalType::Type::DECIMAL:
+ return ExtractDecimalMinMaxFromInteger(statistics.min(),
statistics.max(),
+ logical_type, min, max);
default:
break;
}
- return Status::NotImplemented("Cannot extract statistics for type ");
-}
-
-template <typename DecimalType>
-Result<std::shared_ptr<::arrow::Scalar>> FromBigEndianString(
- const std::string& data, std::shared_ptr<DataType> arrow_type) {
- ARROW_ASSIGN_OR_RAISE(
- DecimalType decimal,
- DecimalType::FromBigEndian(reinterpret_cast<const uint8_t*>(data.data()),
- static_cast<int32_t>(data.size())));
- return ::arrow::MakeScalar(std::move(arrow_type), decimal);
-}
-
-// Extracts Min and Max scalar from bytes like types (i.e. types where
-// decimal is encoded as little endian.
-Status ExtractDecimalMinMaxFromBytesType(const Statistics& statistics,
- const LogicalType& logical_type,
- std::shared_ptr<::arrow::Scalar>* min,
- std::shared_ptr<::arrow::Scalar>*
max) {
- const DecimalLogicalType& decimal_type =
- checked_cast<const DecimalLogicalType&>(logical_type);
-
- Result<std::shared_ptr<DataType>> maybe_type =
- Decimal128Type::Make(decimal_type.precision(), decimal_type.scale());
- std::shared_ptr<DataType> arrow_type;
- if (maybe_type.ok()) {
- arrow_type = maybe_type.ValueOrDie();
- ARROW_ASSIGN_OR_RAISE(
- *min, FromBigEndianString<Decimal128>(statistics.EncodeMin(),
arrow_type));
- ARROW_ASSIGN_OR_RAISE(*max,
FromBigEndianString<Decimal128>(statistics.EncodeMax(),
-
std::move(arrow_type)));
- return Status::OK();
- }
- // Fallback to see if Decimal256 can represent the type.
- ARROW_ASSIGN_OR_RAISE(
- arrow_type, Decimal256Type::Make(decimal_type.precision(),
decimal_type.scale()));
- ARROW_ASSIGN_OR_RAISE(
- *min, FromBigEndianString<Decimal256>(statistics.EncodeMin(),
arrow_type));
- ARROW_ASSIGN_OR_RAISE(*max,
FromBigEndianString<Decimal256>(statistics.EncodeMax(),
-
std::move(arrow_type)));
-
- return Status::OK();
+ return Status::NotImplemented("Cannot extract statistics for INT64 with
logical type ",
+ logical_type.ToString());
}
Status ByteArrayStatisticsAsScalars(const Statistics& statistics,
@@ -247,7 +267,8 @@ Status ByteArrayStatisticsAsScalars(const Statistics&
statistics,
std::shared_ptr<::arrow::Scalar>* max) {
auto logical_type = statistics.descr()->logical_type();
if (logical_type->type() == LogicalType::Type::DECIMAL) {
- return ExtractDecimalMinMaxFromBytesType(statistics, *logical_type, min,
max);
+ return ExtractDecimalMinMaxFromBytes(statistics.EncodeMin(),
statistics.EncodeMax(),
+ *logical_type, min, max);
}
std::shared_ptr<::arrow::DataType> type;
if (statistics.descr()->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {