This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new d02a91b390 GH-41608: [C++][Python] Extends the add_key_value to
parquet::arrow and PyArrow (#41633)
d02a91b390 is described below
commit d02a91b390465dbf530bfba4d100421922b3edda
Author: mwish <[email protected]>
AuthorDate: Tue Jun 4 22:41:32 2024 +0800
GH-41608: [C++][Python] Extends the add_key_value to parquet::arrow and
PyArrow (#41633)
### Rationale for this change
The previous pr ( https://github.com/apache/arrow/pull/34889 ) add a
`AddKeyValueMetadata` to FileWriter. And now we should export it to Parquet
Arrow and Python API.
### What changes are included in this PR?
1. Add `AddKeyValueMetadata` in parquet::arrow
2. Add `add_key_value_metadata` in pyarrow
3. testing
### Are these changes tested?
Yes
### Are there any user-facing changes?
New api allowing add key-value metadata to Parquet file
* GitHub Issue: #41608
Authored-by: mwish <[email protected]>
Signed-off-by: mwish <[email protected]>
---
cpp/src/parquet/CMakeLists.txt | 1 +
cpp/src/parquet/arrow/arrow_metadata_test.cc | 97 ++++++++++++++++++++++
cpp/src/parquet/arrow/writer.cc | 8 ++
cpp/src/parquet/arrow/writer.h | 10 +++
cpp/src/parquet/file_writer.h | 2 +-
python/pyarrow/_parquet.pxd | 1 +
python/pyarrow/_parquet.pyx | 12 ++-
python/pyarrow/parquet/core.py | 13 +++
.../pyarrow/tests/parquet/test_parquet_writer.py | 15 ++++
9 files changed, 157 insertions(+), 2 deletions(-)
diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt
index 5ac5085a69..dc80f08e72 100644
--- a/cpp/src/parquet/CMakeLists.txt
+++ b/cpp/src/parquet/CMakeLists.txt
@@ -397,6 +397,7 @@ add_parquet_test(writer-test
add_parquet_test(arrow-test
SOURCES
+ arrow/arrow_metadata_test.cc
arrow/arrow_reader_writer_test.cc
arrow/arrow_schema_test.cc
arrow/arrow_statistics_test.cc)
diff --git a/cpp/src/parquet/arrow/arrow_metadata_test.cc
b/cpp/src/parquet/arrow/arrow_metadata_test.cc
new file mode 100644
index 0000000000..6f51222770
--- /dev/null
+++ b/cpp/src/parquet/arrow/arrow_metadata_test.cc
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/key_value_metadata.h"
+
+#include "parquet/api/writer.h"
+
+#include "parquet/arrow/reader.h"
+#include "parquet/arrow/schema.h"
+#include "parquet/arrow/writer.h"
+#include "parquet/file_writer.h"
+#include "parquet/test_util.h"
+
+namespace parquet::arrow {
+
+TEST(Metadata, AppendMetadata) {
+ // A sample table, type and structure does not matter in this test case
+ auto schema = ::arrow::schema({::arrow::field("f", ::arrow::utf8())});
+ auto table = ::arrow::Table::Make(
+ schema, {::arrow::ArrayFromJSON(::arrow::utf8(), R"(["a", "b", "c"])")});
+
+ auto sink = CreateOutputStream();
+ ArrowWriterProperties::Builder builder;
+ builder.store_schema();
+ ASSERT_OK_AND_ASSIGN(auto writer,
+ parquet::arrow::FileWriter::Open(
+ *schema, ::arrow::default_memory_pool(), sink,
+ parquet::default_writer_properties(),
builder.build()));
+
+ auto kv_meta = std::make_shared<KeyValueMetadata>();
+ kv_meta->Append("test_key_1", "test_value_1");
+ // <test_key_2, test_value_2_temp> would be overwritten later.
+ kv_meta->Append("test_key_2", "test_value_2_temp");
+ ASSERT_OK(writer->AddKeyValueMetadata(kv_meta));
+
+ // Key value metadata that will be added to the file.
+ auto kv_meta_added = std::make_shared<::arrow::KeyValueMetadata>();
+ kv_meta_added->Append("test_key_2", "test_value_2");
+ kv_meta_added->Append("test_key_3", "test_value_3");
+
+ ASSERT_OK(writer->AddKeyValueMetadata(kv_meta_added));
+ ASSERT_OK(writer->Close());
+
+ // return error if the file is closed
+ ASSERT_RAISES(IOError, writer->AddKeyValueMetadata(kv_meta_added));
+
+ auto verify_key_value_metadata =
+ [&](const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+ ASSERT_TRUE(nullptr != key_value_metadata);
+
+ // Verify keys that were added before file writer was closed are
present.
+ for (int i = 1; i <= 3; ++i) {
+ auto index = std::to_string(i);
+ PARQUET_ASSIGN_OR_THROW(auto value,
+ key_value_metadata->Get("test_key_" +
index));
+ EXPECT_EQ("test_value_" + index, value);
+ }
+ EXPECT_TRUE(key_value_metadata->Contains("ARROW:schema"));
+ };
+ // verify the metadata in writer
+ verify_key_value_metadata(writer->metadata()->key_value_metadata());
+
+ ASSERT_OK(writer->Close());
+
+ ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+ // verify the metadata in reader
+ {
+ std::unique_ptr<FileReader> reader;
+ FileReaderBuilder reader_builder;
+ ASSERT_OK_NO_THROW(
+
reader_builder.Open(std::make_shared<::arrow::io::BufferReader>(buffer)));
+ ASSERT_OK(
+
reader_builder.properties(default_arrow_reader_properties())->Build(&reader));
+
+
verify_key_value_metadata(reader->parquet_reader()->metadata()->key_value_metadata());
+ }
+}
+
+} // namespace parquet::arrow
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index bd6f542d11..4fd7ef1b47 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -482,6 +482,14 @@ class FileWriterImpl : public FileWriter {
return writer_->metadata();
}
+ /// \brief Append the key-value metadata to the file metadata
+ ::arrow::Status AddKeyValueMetadata(
+ const std::shared_ptr<const ::arrow::KeyValueMetadata>&
key_value_metadata)
+ override {
+ PARQUET_CATCH_NOT_OK(writer_->AddKeyValueMetadata(key_value_metadata));
+ return Status::OK();
+ }
+
private:
friend class FileWriter;
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 1decafedc9..4a1a033a7b 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -143,6 +143,16 @@ class PARQUET_EXPORT FileWriter {
virtual ~FileWriter();
virtual MemoryPool* memory_pool() const = 0;
+ /// \brief Add key-value metadata to the file.
+ /// \param[in] key_value_metadata the metadata to add.
+ /// \note This will overwrite any existing metadata with the same key.
+ /// \return Error if Close() has been called.
+ ///
+ /// WARNING: If `store_schema` is enabled, `ARROW:schema` would be stored
+ /// in the key-value metadata. Overwriting this key would result in
+ /// `store_schema` being unusable during read.
+ virtual ::arrow::Status AddKeyValueMetadata(
+ const std::shared_ptr<const ::arrow::KeyValueMetadata>&
key_value_metadata) = 0;
/// \brief Return the file metadata, only available after calling Close().
virtual const std::shared_ptr<FileMetaData> metadata() const = 0;
};
diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h
index 31706af86d..d5ea1d7c98 100644
--- a/cpp/src/parquet/file_writer.h
+++ b/cpp/src/parquet/file_writer.h
@@ -202,7 +202,7 @@ class PARQUET_EXPORT ParquetFileWriter {
/// \brief Add key-value metadata to the file.
/// \param[in] key_value_metadata the metadata to add.
- /// \note This will overwrite any existing metadata with the same key.
+ /// \note This will overwrite any existing metadata with the same key(s).
/// \throw ParquetException if Close() has been called.
void AddKeyValueMetadata(
const std::shared_ptr<const KeyValueMetadata>& key_value_metadata);
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index ae4094d8b4..1bfa505c54 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -554,6 +554,7 @@ cdef extern from "parquet/arrow/writer.h" namespace
"parquet::arrow" nogil:
CStatus WriteTable(const CTable& table, int64_t chunk_size)
CStatus NewRowGroup(int64_t chunk_size)
CStatus Close()
+ CStatus AddKeyValueMetadata(const shared_ptr[const CKeyValueMetadata]&
key_value_metadata)
const shared_ptr[CFileMetaData] metadata() const
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index f7724b9b1f..414f0cef4e 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -29,9 +29,10 @@ from pyarrow.includes.libarrow_python cimport *
from pyarrow.lib cimport (_Weakrefable, Buffer, Schema,
check_status,
MemoryPool, maybe_unbox_memory_pool,
- Table, NativeFile,
+ Table, KeyValueMetadata,
pyarrow_wrap_chunked_array,
pyarrow_wrap_schema,
+ pyarrow_unwrap_metadata,
pyarrow_unwrap_schema,
pyarrow_wrap_table,
pyarrow_wrap_batch,
@@ -2206,6 +2207,15 @@ cdef class ParquetWriter(_Weakrefable):
check_status(self.writer.get()
.WriteTable(deref(ctable), c_row_group_size))
+ def add_key_value_metadata(self, key_value_metadata):
+ cdef:
+ shared_ptr[const CKeyValueMetadata] c_metadata
+
+ c_metadata =
pyarrow_unwrap_metadata(KeyValueMetadata(key_value_metadata))
+ with nogil:
+ check_status(self.writer.get()
+ .AddKeyValueMetadata(c_metadata))
+
@property
def metadata(self):
cdef:
diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py
index 81798b1544..eaff79c8b1 100644
--- a/python/pyarrow/parquet/core.py
+++ b/python/pyarrow/parquet/core.py
@@ -1108,6 +1108,19 @@ Examples
if self.file_handle is not None:
self.file_handle.close()
+ def add_key_value_metadata(self, key_value_metadata):
+ """
+ Add key-value metadata to the file.
+ This will overwrite any existing metadata with the same key.
+
+ Parameters
+ ----------
+ key_value_metadata : dict
+ Keys and values must be string-like / coercible to bytes.
+ """
+ assert self.is_open
+ self.writer.add_key_value_metadata(key_value_metadata)
+
def _get_pandas_index_columns(keyvalues):
return (json.loads(keyvalues[b'pandas'].decode('utf8'))
diff --git a/python/pyarrow/tests/parquet/test_parquet_writer.py
b/python/pyarrow/tests/parquet/test_parquet_writer.py
index f4ee7529ae..bc3714a623 100644
--- a/python/pyarrow/tests/parquet/test_parquet_writer.py
+++ b/python/pyarrow/tests/parquet/test_parquet_writer.py
@@ -346,3 +346,18 @@ def test_parquet_writer_store_schema(tempdir):
meta = pq.read_metadata(path2)
assert meta.metadata is None
+
+
+def test_parquet_writer_append_key_value_metadata(tempdir):
+ table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0'])
+ path = tempdir / 'metadata.parquet'
+
+ with pq.ParquetWriter(path, table.schema) as writer:
+ writer.write_table(table)
+ writer.add_key_value_metadata({'key1': '1', 'key2': 'x'})
+ writer.add_key_value_metadata({'key2': '2', 'key3': '3'})
+ reader = pq.ParquetFile(path)
+ metadata = reader.metadata.metadata
+ assert metadata[b'key1'] == b'1'
+ assert metadata[b'key2'] == b'2'
+ assert metadata[b'key3'] == b'3'