This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new a15956f697 GH-43983: [C++][Parquet] Add support for
arrow::ArrayStatistics: zero-copy types (#43984)
a15956f697 is described below
commit a15956f697dddce4a08198ff3d36ac3e326d069e
Author: Sutou Kouhei <[email protected]>
AuthorDate: Mon Sep 9 08:58:25 2024 +0900
GH-43983: [C++][Parquet] Add support for arrow::ArrayStatistics: zero-copy
types (#43984)
### Rationale for this change
Statistics is useful for fast processing.
Target types:
* `Int32`
* `Int64`
* `Float`
* `Double`
* `Timestamp[milli]`
* `Timestamp[micro]`
* `Timestamp[nano]`
### What changes are included in this PR?
Map `ColumnChunkMetaData` information to `arrow::ArrayStatistics`.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
Yes.
* GitHub Issue: #43983
Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/parquet/arrow/arrow_statistics_test.cc | 66 ++++++++++++++----
cpp/src/parquet/arrow/reader_internal.cc | 96 +++++++++++++++++---------
2 files changed, 117 insertions(+), 45 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow_statistics_test.cc
b/cpp/src/parquet/arrow/arrow_statistics_test.cc
index 2638358f1c..5011bf8911 100644
--- a/cpp/src/parquet/arrow/arrow_statistics_test.cc
+++ b/cpp/src/parquet/arrow/arrow_statistics_test.cc
@@ -18,6 +18,8 @@
#include "gtest/gtest.h"
#include "arrow/array.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/array/builder_time.h"
#include "arrow/table.h"
#include "arrow/testing/gtest_util.h"
@@ -183,9 +185,8 @@ TEST(StatisticsTest, TruncateOnlyHalfMinMax) {
namespace {
::arrow::Result<std::shared_ptr<::arrow::Array>> StatisticsReadArray(
- std::shared_ptr<::arrow::DataType> data_type, const std::string& json) {
+ std::shared_ptr<::arrow::DataType> data_type,
std::shared_ptr<::arrow::Array> array) {
auto schema = ::arrow::schema({::arrow::field("column", data_type)});
- auto array = ::arrow::ArrayFromJSON(data_type, json);
auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(),
{array});
ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create());
const auto arrow_writer_properties =
@@ -211,21 +212,27 @@ namespace {
template <typename ArrowType, typename MinMaxType>
void TestStatisticsReadArray(std::shared_ptr<::arrow::DataType> arrow_type) {
using ArrowArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
+ using ArrowArrayBuilder = typename
::arrow::TypeTraits<ArrowType>::BuilderType;
using ArrowCType = typename ArrowType::c_type;
- constexpr auto min = std::numeric_limits<ArrowCType>::min();
+ constexpr auto min = std::numeric_limits<ArrowCType>::lowest();
constexpr auto max = std::numeric_limits<ArrowCType>::max();
- std::string json;
- json += "[";
- json += std::to_string(max);
- json += ", null, ";
- json += std::to_string(min);
- json += ", ";
- json += std::to_string(max);
- json += "]";
- ASSERT_OK_AND_ASSIGN(auto array, StatisticsReadArray(arrow_type, json));
- auto typed_array = std::static_pointer_cast<ArrowArrayType>(array);
- auto statistics = typed_array->statistics();
+ std::unique_ptr<ArrowArrayBuilder> builder;
+ if constexpr (::arrow::TypeTraits<ArrowType>::is_parameter_free) {
+ builder =
std::make_unique<ArrowArrayBuilder>(::arrow::default_memory_pool());
+ } else {
+ builder =
+ std::make_unique<ArrowArrayBuilder>(arrow_type,
::arrow::default_memory_pool());
+ }
+ ASSERT_OK(builder->Append(max));
+ ASSERT_OK(builder->AppendNull());
+ ASSERT_OK(builder->Append(min));
+ ASSERT_OK(builder->Append(max));
+ ASSERT_OK_AND_ASSIGN(auto built_array, builder->Finish());
+ ASSERT_OK_AND_ASSIGN(auto read_array,
+ StatisticsReadArray(arrow_type,
std::move(built_array)));
+ auto typed_read_array = std::static_pointer_cast<ArrowArrayType>(read_array);
+ auto statistics = typed_read_array->statistics();
ASSERT_NE(nullptr, statistics);
ASSERT_EQ(true, statistics->null_count.has_value());
ASSERT_EQ(1, statistics->null_count.value());
@@ -257,14 +264,30 @@ TEST(TestStatisticsRead, UInt16) {
TestStatisticsReadArray<::arrow::UInt16Type, uint64_t>(::arrow::uint16());
}
+TEST(TestStatisticsRead, Int32) {
+ TestStatisticsReadArray<::arrow::Int32Type, int64_t>(::arrow::int32());
+}
+
TEST(TestStatisticsRead, UInt32) {
TestStatisticsReadArray<::arrow::UInt32Type, uint64_t>(::arrow::uint32());
}
+TEST(TestStatisticsRead, Int64) {
+ TestStatisticsReadArray<::arrow::Int64Type, int64_t>(::arrow::int64());
+}
+
TEST(TestStatisticsRead, UInt64) {
TestStatisticsReadArray<::arrow::UInt64Type, uint64_t>(::arrow::uint64());
}
+TEST(TestStatisticsRead, Float) {
+ TestStatisticsReadArray<::arrow::FloatType, double>(::arrow::float32());
+}
+
+TEST(TestStatisticsRead, Double) {
+ TestStatisticsReadArray<::arrow::DoubleType, double>(::arrow::float64());
+}
+
TEST(TestStatisticsRead, Date32) {
TestStatisticsReadArray<::arrow::Date32Type, int64_t>(::arrow::date32());
}
@@ -279,6 +302,21 @@ TEST(TestStatisticsRead, Time64) {
::arrow::time64(::arrow::TimeUnit::MICRO));
}
+TEST(TestStatisticsRead, TimestampMilli) {
+ TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
+ ::arrow::timestamp(::arrow::TimeUnit::MILLI));
+}
+
+TEST(TestStatisticsRead, TimestampMicro) {
+ TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
+ ::arrow::timestamp(::arrow::TimeUnit::MICRO));
+}
+
+TEST(TestStatisticsRead, TimestampNano) {
+ TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
+ ::arrow::timestamp(::arrow::TimeUnit::NANO));
+}
+
TEST(TestStatisticsRead, Duration) {
TestStatisticsReadArray<::arrow::DurationType, int64_t>(
::arrow::duration(::arrow::TimeUnit::NANO));
diff --git a/cpp/src/parquet/arrow/reader_internal.cc
b/cpp/src/parquet/arrow/reader_internal.cc
index e6c2d95e1f..aa84a7a92b 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -319,30 +319,20 @@ void ReconstructChunksWithoutNulls(::arrow::ArrayVector*
chunks) {
}
template <typename ArrowType, typename ParquetType>
-Status TransferInt(RecordReader* reader,
- std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
- const ReaderContext* ctx, const std::shared_ptr<Field>&
field,
- Datum* out) {
+void AttachStatistics(::arrow::ArrayData* data,
+ std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
+ const ReaderContext* ctx) {
using ArrowCType = typename ArrowType::c_type;
- using ParquetCType = typename ParquetType::c_type;
- int64_t length = reader->values_written();
- ARROW_ASSIGN_OR_RAISE(auto data,
- ::arrow::AllocateBuffer(length * sizeof(ArrowCType),
ctx->pool));
- auto values = reinterpret_cast<const ParquetCType*>(reader->values());
- auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
- std::copy(values, values + length, out_ptr);
- int64_t null_count = 0;
- std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
- if (field->nullable()) {
- null_count = reader->null_count();
- buffers[0] = reader->ReleaseIsValid();
+ auto statistics = metadata->statistics().get();
+ if (data->null_count == ::arrow::kUnknownNullCount && !statistics) {
+ return;
}
- auto array_data =
- ::arrow::ArrayData::Make(field->type(), length, std::move(buffers),
null_count);
+
auto array_statistics = std::make_shared<::arrow::ArrayStatistics>();
- array_statistics->null_count = null_count;
- auto statistics = metadata->statistics().get();
+ if (data->null_count != ::arrow::kUnknownNullCount) {
+ array_statistics->null_count = data->null_count;
+ }
if (statistics) {
if (statistics->HasDistinctCount()) {
array_statistics->distinct_count = statistics->distinct_count();
@@ -352,17 +342,21 @@ Status TransferInt(RecordReader* reader,
static_cast<::parquet::TypedStatistics<ParquetType>*>(statistics);
const ArrowCType min = typed_statistics->min();
const ArrowCType max = typed_statistics->max();
- if (std::is_signed<ArrowCType>::value) {
+ if (std::is_floating_point<ArrowCType>::value) {
+ array_statistics->min = static_cast<double>(min);
+ array_statistics->max = static_cast<double>(max);
+ } else if (std::is_signed<ArrowCType>::value) {
array_statistics->min = static_cast<int64_t>(min);
array_statistics->max = static_cast<int64_t>(max);
} else {
array_statistics->min = static_cast<uint64_t>(min);
array_statistics->max = static_cast<uint64_t>(max);
}
- // We can assume that integer based min/max are always exact if
- // they exist. Apache Parquet's "Statistics" has
- // "is_min_value_exact" and "is_max_value_exact" but we can
- // ignore them for integer based min/max.
+ // We can assume that integer and floating point number based
+ // min/max are always exact if they exist. Apache Parquet's
+ // "Statistics" has "is_min_value_exact" and
+ // "is_max_value_exact" but we can ignore them for integer and
+ // floating point number based min/max.
//
// See also the discussion at [email protected]:
// https://lists.apache.org/thread/zfnmg5p51b7oylft5w5k4670wgkd4zv4
@@ -370,13 +364,41 @@ Status TransferInt(RecordReader* reader,
array_statistics->is_max_exact = true;
}
}
- array_data->statistics = std::move(array_statistics);
+
+ data->statistics = std::move(array_statistics);
+}
+
+template <typename ArrowType, typename ParquetType>
+Status TransferInt(RecordReader* reader,
+ std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
+ const ReaderContext* ctx, const std::shared_ptr<Field>&
field,
+ Datum* out) {
+ using ArrowCType = typename ArrowType::c_type;
+ using ParquetCType = typename ParquetType::c_type;
+ int64_t length = reader->values_written();
+ ARROW_ASSIGN_OR_RAISE(auto data,
+ ::arrow::AllocateBuffer(length * sizeof(ArrowCType),
ctx->pool));
+
+ auto values = reinterpret_cast<const ParquetCType*>(reader->values());
+ auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
+ std::copy(values, values + length, out_ptr);
+ int64_t null_count = 0;
+ std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
+ if (field->nullable()) {
+ null_count = reader->null_count();
+ buffers[0] = reader->ReleaseIsValid();
+ }
+ auto array_data =
+ ::arrow::ArrayData::Make(field->type(), length, std::move(buffers),
null_count);
+ AttachStatistics<ArrowType, ParquetType>(array_data.get(),
std::move(metadata), ctx);
*out = std::make_shared<ArrayType<ArrowType>>(std::move(array_data));
return Status::OK();
}
-std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
- const std::shared_ptr<Field>& field) {
+template <typename ArrowType, typename ParquetType>
+std::shared_ptr<Array> TransferZeroCopy(
+ RecordReader* reader, std::unique_ptr<::parquet::ColumnChunkMetaData>
metadata,
+ const ReaderContext* ctx, const std::shared_ptr<Field>& field) {
std::shared_ptr<::arrow::ArrayData> data;
if (field->nullable()) {
std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
@@ -388,7 +410,8 @@ std::shared_ptr<Array> TransferZeroCopy(RecordReader*
reader,
data = std::make_shared<::arrow::ArrayData>(field->type(),
reader->values_written(),
std::move(buffers),
/*null_count=*/0);
}
- return ::arrow::MakeArray(data);
+ AttachStatistics<ArrowType, ParquetType>(data.get(), std::move(metadata),
ctx);
+ return ::arrow::MakeArray(std::move(data));
}
Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool,
Datum* out) {
@@ -794,10 +817,20 @@ Status TransferColumnData(RecordReader* reader,
break;
}
case ::arrow::Type::INT32:
+ result = TransferZeroCopy<::arrow::Int32Type, Int32Type>(
+ reader, std::move(metadata), ctx, value_field);
+ break;
case ::arrow::Type::INT64:
+ result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
+ reader, std::move(metadata), ctx, value_field);
+ break;
case ::arrow::Type::FLOAT:
+ result = TransferZeroCopy<::arrow::FloatType, FloatType>(
+ reader, std::move(metadata), ctx, value_field);
+ break;
case ::arrow::Type::DOUBLE:
- result = TransferZeroCopy(reader, value_field);
+ result = TransferZeroCopy<::arrow::DoubleType, DoubleType>(
+ reader, std::move(metadata), ctx, value_field);
break;
case ::arrow::Type::BOOL:
RETURN_NOT_OK(TransferBool(reader, value_field->nullable(), pool,
&result));
@@ -895,7 +928,8 @@ Status TransferColumnData(RecordReader* reader,
case ::arrow::TimeUnit::MILLI:
case ::arrow::TimeUnit::MICRO:
case ::arrow::TimeUnit::NANO:
- result = TransferZeroCopy(reader, value_field);
+ result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
+ reader, std::move(metadata), ctx, value_field);
break;
default:
return Status::NotImplemented("TimeUnit not supported");