This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new a15956f697 GH-43983: [C++][Parquet] Add support for 
arrow::ArrayStatistics: zero-copy types (#43984)
a15956f697 is described below

commit a15956f697dddce4a08198ff3d36ac3e326d069e
Author: Sutou Kouhei <[email protected]>
AuthorDate: Mon Sep 9 08:58:25 2024 +0900

    GH-43983: [C++][Parquet] Add support for arrow::ArrayStatistics: zero-copy 
types (#43984)
    
    ### Rationale for this change
    
    Statistics is useful for fast processing.
    
    Target types:
    
    * `Int32`
    * `Int64`
    * `Float`
    * `Double`
    * `Timestamp[milli]`
    * `Timestamp[micro]`
    * `Timestamp[nano]`
    
    ### What changes are included in this PR?
    
    Map `ColumnChunkMetaData` information to `arrow::ArrayStatistics`.
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    Yes.
    * GitHub Issue: #43983
    
    Authored-by: Sutou Kouhei <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 cpp/src/parquet/arrow/arrow_statistics_test.cc | 66 ++++++++++++++----
 cpp/src/parquet/arrow/reader_internal.cc       | 96 +++++++++++++++++---------
 2 files changed, 117 insertions(+), 45 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow_statistics_test.cc 
b/cpp/src/parquet/arrow/arrow_statistics_test.cc
index 2638358f1c..5011bf8911 100644
--- a/cpp/src/parquet/arrow/arrow_statistics_test.cc
+++ b/cpp/src/parquet/arrow/arrow_statistics_test.cc
@@ -18,6 +18,8 @@
 #include "gtest/gtest.h"
 
 #include "arrow/array.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/array/builder_time.h"
 #include "arrow/table.h"
 #include "arrow/testing/gtest_util.h"
 
@@ -183,9 +185,8 @@ TEST(StatisticsTest, TruncateOnlyHalfMinMax) {
 
 namespace {
 ::arrow::Result<std::shared_ptr<::arrow::Array>> StatisticsReadArray(
-    std::shared_ptr<::arrow::DataType> data_type, const std::string& json) {
+    std::shared_ptr<::arrow::DataType> data_type, 
std::shared_ptr<::arrow::Array> array) {
   auto schema = ::arrow::schema({::arrow::field("column", data_type)});
-  auto array = ::arrow::ArrayFromJSON(data_type, json);
   auto record_batch = ::arrow::RecordBatch::Make(schema, array->length(), 
{array});
   ARROW_ASSIGN_OR_RAISE(auto sink, ::arrow::io::BufferOutputStream::Create());
   const auto arrow_writer_properties =
@@ -211,21 +212,27 @@ namespace {
 template <typename ArrowType, typename MinMaxType>
 void TestStatisticsReadArray(std::shared_ptr<::arrow::DataType> arrow_type) {
   using ArrowArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
+  using ArrowArrayBuilder = typename 
::arrow::TypeTraits<ArrowType>::BuilderType;
   using ArrowCType = typename ArrowType::c_type;
-  constexpr auto min = std::numeric_limits<ArrowCType>::min();
+  constexpr auto min = std::numeric_limits<ArrowCType>::lowest();
   constexpr auto max = std::numeric_limits<ArrowCType>::max();
 
-  std::string json;
-  json += "[";
-  json += std::to_string(max);
-  json += ", null, ";
-  json += std::to_string(min);
-  json += ", ";
-  json += std::to_string(max);
-  json += "]";
-  ASSERT_OK_AND_ASSIGN(auto array, StatisticsReadArray(arrow_type, json));
-  auto typed_array = std::static_pointer_cast<ArrowArrayType>(array);
-  auto statistics = typed_array->statistics();
+  std::unique_ptr<ArrowArrayBuilder> builder;
+  if constexpr (::arrow::TypeTraits<ArrowType>::is_parameter_free) {
+    builder = 
std::make_unique<ArrowArrayBuilder>(::arrow::default_memory_pool());
+  } else {
+    builder =
+        std::make_unique<ArrowArrayBuilder>(arrow_type, 
::arrow::default_memory_pool());
+  }
+  ASSERT_OK(builder->Append(max));
+  ASSERT_OK(builder->AppendNull());
+  ASSERT_OK(builder->Append(min));
+  ASSERT_OK(builder->Append(max));
+  ASSERT_OK_AND_ASSIGN(auto built_array, builder->Finish());
+  ASSERT_OK_AND_ASSIGN(auto read_array,
+                       StatisticsReadArray(arrow_type, 
std::move(built_array)));
+  auto typed_read_array = std::static_pointer_cast<ArrowArrayType>(read_array);
+  auto statistics = typed_read_array->statistics();
   ASSERT_NE(nullptr, statistics);
   ASSERT_EQ(true, statistics->null_count.has_value());
   ASSERT_EQ(1, statistics->null_count.value());
@@ -257,14 +264,30 @@ TEST(TestStatisticsRead, UInt16) {
   TestStatisticsReadArray<::arrow::UInt16Type, uint64_t>(::arrow::uint16());
 }
 
+TEST(TestStatisticsRead, Int32) {
+  TestStatisticsReadArray<::arrow::Int32Type, int64_t>(::arrow::int32());
+}
+
 TEST(TestStatisticsRead, UInt32) {
   TestStatisticsReadArray<::arrow::UInt32Type, uint64_t>(::arrow::uint32());
 }
 
+TEST(TestStatisticsRead, Int64) {
+  TestStatisticsReadArray<::arrow::Int64Type, int64_t>(::arrow::int64());
+}
+
 TEST(TestStatisticsRead, UInt64) {
   TestStatisticsReadArray<::arrow::UInt64Type, uint64_t>(::arrow::uint64());
 }
 
+TEST(TestStatisticsRead, Float) {
+  TestStatisticsReadArray<::arrow::FloatType, double>(::arrow::float32());
+}
+
+TEST(TestStatisticsRead, Double) {
+  TestStatisticsReadArray<::arrow::DoubleType, double>(::arrow::float64());
+}
+
 TEST(TestStatisticsRead, Date32) {
   TestStatisticsReadArray<::arrow::Date32Type, int64_t>(::arrow::date32());
 }
@@ -279,6 +302,21 @@ TEST(TestStatisticsRead, Time64) {
       ::arrow::time64(::arrow::TimeUnit::MICRO));
 }
 
+TEST(TestStatisticsRead, TimestampMilli) {
+  TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
+      ::arrow::timestamp(::arrow::TimeUnit::MILLI));
+}
+
+TEST(TestStatisticsRead, TimestampMicro) {
+  TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
+      ::arrow::timestamp(::arrow::TimeUnit::MICRO));
+}
+
+TEST(TestStatisticsRead, TimestampNano) {
+  TestStatisticsReadArray<::arrow::TimestampType, int64_t>(
+      ::arrow::timestamp(::arrow::TimeUnit::NANO));
+}
+
 TEST(TestStatisticsRead, Duration) {
   TestStatisticsReadArray<::arrow::DurationType, int64_t>(
       ::arrow::duration(::arrow::TimeUnit::NANO));
diff --git a/cpp/src/parquet/arrow/reader_internal.cc 
b/cpp/src/parquet/arrow/reader_internal.cc
index e6c2d95e1f..aa84a7a92b 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -319,30 +319,20 @@ void ReconstructChunksWithoutNulls(::arrow::ArrayVector* 
chunks) {
 }
 
 template <typename ArrowType, typename ParquetType>
-Status TransferInt(RecordReader* reader,
-                   std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
-                   const ReaderContext* ctx, const std::shared_ptr<Field>& 
field,
-                   Datum* out) {
+void AttachStatistics(::arrow::ArrayData* data,
+                      std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
+                      const ReaderContext* ctx) {
   using ArrowCType = typename ArrowType::c_type;
-  using ParquetCType = typename ParquetType::c_type;
-  int64_t length = reader->values_written();
-  ARROW_ASSIGN_OR_RAISE(auto data,
-                        ::arrow::AllocateBuffer(length * sizeof(ArrowCType), 
ctx->pool));
 
-  auto values = reinterpret_cast<const ParquetCType*>(reader->values());
-  auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
-  std::copy(values, values + length, out_ptr);
-  int64_t null_count = 0;
-  std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
-  if (field->nullable()) {
-    null_count = reader->null_count();
-    buffers[0] = reader->ReleaseIsValid();
+  auto statistics = metadata->statistics().get();
+  if (data->null_count == ::arrow::kUnknownNullCount && !statistics) {
+    return;
   }
-  auto array_data =
-      ::arrow::ArrayData::Make(field->type(), length, std::move(buffers), 
null_count);
+
   auto array_statistics = std::make_shared<::arrow::ArrayStatistics>();
-  array_statistics->null_count = null_count;
-  auto statistics = metadata->statistics().get();
+  if (data->null_count != ::arrow::kUnknownNullCount) {
+    array_statistics->null_count = data->null_count;
+  }
   if (statistics) {
     if (statistics->HasDistinctCount()) {
       array_statistics->distinct_count = statistics->distinct_count();
@@ -352,17 +342,21 @@ Status TransferInt(RecordReader* reader,
           static_cast<::parquet::TypedStatistics<ParquetType>*>(statistics);
       const ArrowCType min = typed_statistics->min();
       const ArrowCType max = typed_statistics->max();
-      if (std::is_signed<ArrowCType>::value) {
+      if (std::is_floating_point<ArrowCType>::value) {
+        array_statistics->min = static_cast<double>(min);
+        array_statistics->max = static_cast<double>(max);
+      } else if (std::is_signed<ArrowCType>::value) {
         array_statistics->min = static_cast<int64_t>(min);
         array_statistics->max = static_cast<int64_t>(max);
       } else {
         array_statistics->min = static_cast<uint64_t>(min);
         array_statistics->max = static_cast<uint64_t>(max);
       }
-      // We can assume that integer based min/max are always exact if
-      // they exist. Apache Parquet's "Statistics" has
-      // "is_min_value_exact" and "is_max_value_exact" but we can
-      // ignore them for integer based min/max.
+      // We can assume that integer and floating point number based
+      // min/max are always exact if they exist. Apache Parquet's
+      // "Statistics" has "is_min_value_exact" and
+      // "is_max_value_exact" but we can ignore them for integer and
+      // floating point number based min/max.
       //
       // See also the discussion at [email protected]:
       // https://lists.apache.org/thread/zfnmg5p51b7oylft5w5k4670wgkd4zv4
@@ -370,13 +364,41 @@ Status TransferInt(RecordReader* reader,
       array_statistics->is_max_exact = true;
     }
   }
-  array_data->statistics = std::move(array_statistics);
+
+  data->statistics = std::move(array_statistics);
+}
+
+template <typename ArrowType, typename ParquetType>
+Status TransferInt(RecordReader* reader,
+                   std::unique_ptr<::parquet::ColumnChunkMetaData> metadata,
+                   const ReaderContext* ctx, const std::shared_ptr<Field>& 
field,
+                   Datum* out) {
+  using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+  int64_t length = reader->values_written();
+  ARROW_ASSIGN_OR_RAISE(auto data,
+                        ::arrow::AllocateBuffer(length * sizeof(ArrowCType), 
ctx->pool));
+
+  auto values = reinterpret_cast<const ParquetCType*>(reader->values());
+  auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
+  std::copy(values, values + length, out_ptr);
+  int64_t null_count = 0;
+  std::vector<std::shared_ptr<Buffer>> buffers = {nullptr, std::move(data)};
+  if (field->nullable()) {
+    null_count = reader->null_count();
+    buffers[0] = reader->ReleaseIsValid();
+  }
+  auto array_data =
+      ::arrow::ArrayData::Make(field->type(), length, std::move(buffers), 
null_count);
+  AttachStatistics<ArrowType, ParquetType>(array_data.get(), 
std::move(metadata), ctx);
   *out = std::make_shared<ArrayType<ArrowType>>(std::move(array_data));
   return Status::OK();
 }
 
-std::shared_ptr<Array> TransferZeroCopy(RecordReader* reader,
-                                        const std::shared_ptr<Field>& field) {
+template <typename ArrowType, typename ParquetType>
+std::shared_ptr<Array> TransferZeroCopy(
+    RecordReader* reader, std::unique_ptr<::parquet::ColumnChunkMetaData> 
metadata,
+    const ReaderContext* ctx, const std::shared_ptr<Field>& field) {
   std::shared_ptr<::arrow::ArrayData> data;
   if (field->nullable()) {
     std::vector<std::shared_ptr<Buffer>> buffers = {reader->ReleaseIsValid(),
@@ -388,7 +410,8 @@ std::shared_ptr<Array> TransferZeroCopy(RecordReader* 
reader,
     data = std::make_shared<::arrow::ArrayData>(field->type(), 
reader->values_written(),
                                                 std::move(buffers), 
/*null_count=*/0);
   }
-  return ::arrow::MakeArray(data);
+  AttachStatistics<ArrowType, ParquetType>(data.get(), std::move(metadata), 
ctx);
+  return ::arrow::MakeArray(std::move(data));
 }
 
 Status TransferBool(RecordReader* reader, bool nullable, MemoryPool* pool, 
Datum* out) {
@@ -794,10 +817,20 @@ Status TransferColumnData(RecordReader* reader,
       break;
     }
     case ::arrow::Type::INT32:
+      result = TransferZeroCopy<::arrow::Int32Type, Int32Type>(
+          reader, std::move(metadata), ctx, value_field);
+      break;
     case ::arrow::Type::INT64:
+      result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
+          reader, std::move(metadata), ctx, value_field);
+      break;
     case ::arrow::Type::FLOAT:
+      result = TransferZeroCopy<::arrow::FloatType, FloatType>(
+          reader, std::move(metadata), ctx, value_field);
+      break;
     case ::arrow::Type::DOUBLE:
-      result = TransferZeroCopy(reader, value_field);
+      result = TransferZeroCopy<::arrow::DoubleType, DoubleType>(
+          reader, std::move(metadata), ctx, value_field);
       break;
     case ::arrow::Type::BOOL:
       RETURN_NOT_OK(TransferBool(reader, value_field->nullable(), pool, 
&result));
@@ -895,7 +928,8 @@ Status TransferColumnData(RecordReader* reader,
           case ::arrow::TimeUnit::MILLI:
           case ::arrow::TimeUnit::MICRO:
           case ::arrow::TimeUnit::NANO:
-            result = TransferZeroCopy(reader, value_field);
+            result = TransferZeroCopy<::arrow::Int64Type, Int64Type>(
+                reader, std::move(metadata), ctx, value_field);
             break;
           default:
             return Status::NotImplemented("TimeUnit not supported");

Reply via email to