This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 008d2777ea GH-37111: [C++][Parquet] Dataset: Fixing Schema Cast
(#37793)
008d2777ea is described below
commit 008d2777ea3bdb6cf5f62144ace42ff725bc6255
Author: mwish <[email protected]>
AuthorDate: Wed Sep 20 20:08:20 2023 +0800
GH-37111: [C++][Parquet] Dataset: Fixing Schema Cast (#37793)
### Rationale for this change
Parquet and Arrow has two schema:
1. Parquet has a SchemaElement[1], it's language and implement independent.
Parquet Arrow will extract the schema and decude it.
2. Parquet arrow stores schema and possible `field_id` in
`key_value_metadata`[2] when `store_schema` enabled. When deserializing, it
will first parse `SchemaElement`[1], and if self-defined key_value_metadata
exists, it will use `key_value_metadata` to override the [1]
[1]
https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L356
[2]
https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L1033
The bug raise from that, when dataset parsing `SchemaManifest`, it doesn't
use `key_value_metadata` from `Metadata`, which raises the problem.
For duration, when `store_schema` enabled, it will store `Int64` as
physical type, and add a `::arrow::Duration` in `key_value_metadata`. And there
is no `equal(Duration, i64)`. So raise the un-impl
### What changes are included in this PR?
Set `key_value_metadata` in implemented.
### Are these changes tested?
Yes
### Are there any user-facing changes?
bugfix
* Closes: #37111
Authored-by: mwish <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
cpp/src/arrow/dataset/file_parquet.cc | 7 ++++---
cpp/src/arrow/dataset/file_parquet_test.cc | 31 ++++++++++++++++++++++++++++--
2 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet.cc
b/cpp/src/arrow/dataset/file_parquet.cc
index 9d0e8a6515..751937e93b 100644
--- a/cpp/src/arrow/dataset/file_parquet.cc
+++ b/cpp/src/arrow/dataset/file_parquet.cc
@@ -104,11 +104,12 @@ parquet::ArrowReaderProperties MakeArrowReaderProperties(
return arrow_properties;
}
-template <typename M>
Result<std::shared_ptr<SchemaManifest>> GetSchemaManifest(
- const M& metadata, const parquet::ArrowReaderProperties& properties) {
+ const parquet::FileMetaData& metadata,
+ const parquet::ArrowReaderProperties& properties) {
auto manifest = std::make_shared<SchemaManifest>();
- const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata =
nullptr;
+ const std::shared_ptr<const ::arrow::KeyValueMetadata>& key_value_metadata =
+ metadata.key_value_metadata();
RETURN_NOT_OK(SchemaManifest::Make(metadata.schema(), key_value_metadata,
properties,
manifest.get()));
return manifest;
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc
b/cpp/src/arrow/dataset/file_parquet_test.cc
index 8527c3af64..177ca82417 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -65,11 +65,15 @@ class ParquetFormatHelper {
public:
using FormatType = ParquetFileFormat;
- static Result<std::shared_ptr<Buffer>> Write(RecordBatchReader* reader) {
+ static Result<std::shared_ptr<Buffer>> Write(
+ RecordBatchReader* reader,
+ const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+ default_arrow_writer_properties()) {
auto pool = ::arrow::default_memory_pool();
std::shared_ptr<Buffer> out;
auto sink = CreateOutputStream(pool);
- RETURN_NOT_OK(WriteRecordBatchReader(reader, pool, sink));
+ RETURN_NOT_OK(WriteRecordBatchReader(reader, pool, sink,
default_writer_properties(),
+ arrow_properties));
return sink->Finish();
}
static std::shared_ptr<ParquetFileFormat> MakeFormat() {
@@ -703,6 +707,29 @@ TEST_P(TestParquetFileFormatScan,
PredicatePushdownRowGroupFragmentsUsingStringC
CountRowGroupsInFragment(fragment, {0, 3}, equal(field_ref("x"),
literal("a")));
}
+TEST_P(TestParquetFileFormatScan,
PredicatePushdownRowGroupFragmentsUsingDurationColumn) {
+ // GH-37111: Parquet arrow stores writer schema and possible field_id in
+ // key_value_metadata when store_schema enabled. When storing
`arrow::duration`, it will
+ // be stored as int64. This test ensures that dataset can parse the writer
schema
+ // correctly.
+ auto table = TableFromJSON(schema({field("t", duration(TimeUnit::NANO))}),
+ {
+ R"([{"t": 1}])",
+ R"([{"t": 2}, {"t": 3}])",
+ });
+ TableBatchReader table_reader(*table);
+ ASSERT_OK_AND_ASSIGN(
+ auto buffer,
+ ParquetFormatHelper::Write(
+ &table_reader,
ArrowWriterProperties::Builder().store_schema()->build()));
+ auto source = std::make_shared<FileSource>(buffer);
+ SetSchema({field("t", duration(TimeUnit::NANO))});
+ ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source));
+
+ auto expr = equal(field_ref("t"), literal(::arrow::DurationScalar(1,
TimeUnit::NANO)));
+ CountRowGroupsInFragment(fragment, {0}, expr);
+}
+
// Tests projection with nested/indexed FieldRefs.
// https://github.com/apache/arrow/issues/35579
TEST_P(TestParquetFileFormatScan, ProjectWithNonNamedFieldRefs) {