This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 008d2777ea GH-37111: [C++][Parquet] Dataset: Fixing Schema Cast 
(#37793)
008d2777ea is described below

commit 008d2777ea3bdb6cf5f62144ace42ff725bc6255
Author: mwish <[email protected]>
AuthorDate: Wed Sep 20 20:08:20 2023 +0800

    GH-37111: [C++][Parquet] Dataset: Fixing Schema Cast (#37793)
    
    
    
    ### Rationale for this change
    
    Parquet and Arrow has two schema:
    1. Parquet has a SchemaElement[1], it's language and implement independent. 
Parquet Arrow will extract the schema and decude it.
    2. Parquet arrow stores schema and possible `field_id` in 
`key_value_metadata`[2] when `store_schema` enabled. When deserializing, it 
will first parse `SchemaElement`[1], and if self-defined key_value_metadata 
exists, it will use `key_value_metadata` to override the [1]
    
    [1] 
https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L356
    [2] 
https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L1033
    
    The bug raise from that, when dataset parsing `SchemaManifest`, it doesn't 
use `key_value_metadata` from `Metadata`, which raises the problem.
    
    For duration, when `store_schema` enabled, it will store `Int64` as 
physical type, and add a `::arrow::Duration` in `key_value_metadata`. And there 
is no `equal(Duration, i64)`. So raise the un-impl
    
    ### What changes are included in this PR?
    
    Set `key_value_metadata` in implemented.
    
    ### Are these changes tested?
    
    Yes
    
    ### Are there any user-facing changes?
    
    bugfix
    
    * Closes: #37111
    
    Authored-by: mwish <[email protected]>
    Signed-off-by: Benjamin Kietzman <[email protected]>
---
 cpp/src/arrow/dataset/file_parquet.cc      |  7 ++++---
 cpp/src/arrow/dataset/file_parquet_test.cc | 31 ++++++++++++++++++++++++++++--
 2 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/cpp/src/arrow/dataset/file_parquet.cc 
b/cpp/src/arrow/dataset/file_parquet.cc
index 9d0e8a6515..751937e93b 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -104,11 +104,12 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
   return arrow_properties;
 }
 
-template <typename M>
 Result<std::shared_ptr<SchemaManifest>> GetSchemaManifest(
-    const M& metadata, const parquet::ArrowReaderProperties& properties) {
+    const parquet::FileMetaData& metadata,
+    const parquet::ArrowReaderProperties& properties) {
   auto manifest = std::make_shared<SchemaManifest>();
-  const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata = 
nullptr;
+  const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata =
+      metadata.key_value_metadata();
   RETURN_NOT_OK(SchemaManifest::Make(metadata.schema(), key_value_metadata, 
properties,
                                      manifest.get()));
   return manifest;
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc 
b/cpp/src/arrow/dataset/file_parquet_test.cc
index 8527c3af64..177ca82417 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -65,11 +65,15 @@ class ParquetFormatHelper {
  public:
   using FormatType = ParquetFileFormat;
 
-  static Result<std::shared_ptr<Buffer>> Write(RecordBatchReader* reader) {
+  static Result<std::shared_ptr<Buffer>> Write(
+      RecordBatchReader* reader,
+      const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+          default_arrow_writer_properties()) {
     auto pool = ::arrow::default_memory_pool();
     std::shared_ptr<Buffer> out;
     auto sink = CreateOutputStream(pool);
-    RETURN_NOT_OK(WriteRecordBatchReader(reader, pool, sink));
+    RETURN_NOT_OK(WriteRecordBatchReader(reader, pool, sink, 
default_writer_properties(),
+                                         arrow_properties));
     return sink->Finish();
   }
   static std::shared_ptr<ParquetFileFormat> MakeFormat() {
@@ -703,6 +707,29 @@ TEST_P(TestParquetFileFormatScan, 
PredicatePushdownRowGroupFragmentsUsingStringC
   CountRowGroupsInFragment(fragment, {0, 3}, equal(field_ref("x"), 
literal("a")));
 }
 
+TEST_P(TestParquetFileFormatScan, 
PredicatePushdownRowGroupFragmentsUsingDurationColumn) {
+  // GH-37111: Parquet arrow stores writer schema and possible field_id in
+  // key_value_metadata when store_schema enabled. When storing 
`arrow::duration`, it will
+  // be stored as int64. This test ensures that dataset can parse the writer 
schema
+  // correctly.
+  auto table = TableFromJSON(schema({field("t", duration(TimeUnit::NANO))}),
+                             {
+                                 R"([{"t": 1}])",
+                                 R"([{"t": 2}, {"t": 3}])",
+                             });
+  TableBatchReader table_reader(*table);
+  ASSERT_OK_AND_ASSIGN(
+      auto buffer,
+      ParquetFormatHelper::Write(
+          &table_reader, 
ArrowWriterProperties::Builder().store_schema()->build()));
+  auto source = std::make_shared<FileSource>(buffer);
+  SetSchema({field("t", duration(TimeUnit::NANO))});
+  ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+
+  auto expr = equal(field_ref("t"), literal(::arrow::DurationScalar(1, 
TimeUnit::NANO)));
+  CountRowGroupsInFragment(fragment, {0}, expr);
+}
+
 // Tests projection with nested/indexed FieldRefs.
 // https://github.com/apache/arrow/issues/35579
 TEST_P(TestParquetFileFormatScan, ProjectWithNonNamedFieldRefs) {

Reply via email to