This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 997226a ARROW-1983: [C++][Parquet] Add AppendRowGroups and
WriteMetaDataFile methods
997226a is described below
commit 997226a9263430bc0422189180bc2551aed1f63d
Author: rjzamora <[email protected]>
AuthorDate: Thu Jun 6 16:36:09 2019 -0500
ARROW-1983: [C++][Parquet] Add AppendRowGroups and WriteMetaDataFile methods
@wesm @jorisvandenbossche - This is a rough RFC for one possible solution
to the write phase of
[ARROW-1983](https://issues.apache.org/jira/browse/ARROW-1983). Summary: I
added new `AppendRowGroups` and `WriteMetaDataFile` methods to the
`FileMetData` class (along with python wrappers). I also added a
`get_row_group` method to `FileMetaData::FileMetaDataImpl` to make the append
function easier to write (not sure if this is really necessary).
Feel free to recommend changes and/or complete rewrites. I realize that an
ideal solution might be to modify the existing `ParquetWriter`/`write_metadata`
funtionality to accept an existing `FileMetaData` object as input, so I will
wait to add tests until I know this approach seems reasonable.
Author: rjzamora <[email protected]>
Closes #4405 from rjzamora/metadata-append and squashes the following
commits:
b4ffc9feb <rjzamora> fixing clang-format style issues
5999609cd <rjzamora> improve pytesting and remove use of shared_ptr in API
4085e0efa <rjzamora> adding WriteMetaDataFile to FileMetaData API
1973b61ee <rjzamora> easy code-review fixes (typos and renaming)
3cc44231d <rjzamora> more pytest linting
873ba25fb <rjzamora> fixing a few failed checks
b3dd4ddb1 <rjzamora> sync with upstream master
9a01fcbbe <rjzamora> testing read_metadata and lint
d0f86298e <rjzamora> using write_table in test
695e9a56c <rjzamora> fixing some linting and warnings
2544da17e <rjzamora> add python test for multi-dataset metadata
b7eb85977 <rjzamora> adding test for AppendRowGroups
7a4c76cfb <rjzamora> using existing WriteFileMetaData function instead of
new WriteMetaDataFile
ffd343530 <rjzamora> minor cleanup
1486f8da0 <rjzamora> add WriteMetaDataFile function
54f3ea00a <rjzamora> adding method to append row-group metadata to an
existing FileMetadata object
---
cpp/src/parquet/arrow/writer.cc | 6 +++
cpp/src/parquet/arrow/writer.h | 5 +++
cpp/src/parquet/file_writer.cc | 5 +++
cpp/src/parquet/file_writer.h | 4 ++
cpp/src/parquet/metadata-test.cc | 81 ++++++++++++++++++++++--------------
cpp/src/parquet/metadata.cc | 18 ++++++++
cpp/src/parquet/metadata.h | 3 ++
python/pyarrow/_parquet.pxd | 5 +++
python/pyarrow/_parquet.pyx | 30 +++++++++++++
python/pyarrow/tests/test_parquet.py | 42 +++++++++++++++++++
10 files changed, 168 insertions(+), 31 deletions(-)
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index ad7bd51..d6a9b44 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -1135,6 +1135,12 @@ Status WriteFileMetaData(const FileMetaData&
file_metadata,
return Status::OK();
}
+Status WriteMetaDataFile(const FileMetaData& file_metadata,
+ ::arrow::io::OutputStream* sink) {
+ PARQUET_CATCH_NOT_OK(::parquet::WriteMetaDataFile(file_metadata, sink));
+ return Status::OK();
+}
+
Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
RETURN_NOT_OK(table.Validate());
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 97ed0f7..8014e1a 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -181,6 +181,11 @@ PARQUET_EXPORT
::arrow::Status WriteFileMetaData(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);
+/// \brief Write metadata-only Parquet file to indicated Arrow OutputStream
+PARQUET_EXPORT
+::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata,
+ ::arrow::io::OutputStream* sink);
+
/**
* Write a Table to Parquet.
*
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index 87ab7b4..f2f42f3 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -382,6 +382,11 @@ void WriteFileMetaData(const FileMetaData& file_metadata,
OutputStream* sink) {
return WriteFileMetaData(file_metadata, &wrapper);
}
+void WriteMetaDataFile(const FileMetaData& file_metadata, ArrowOutputStream*
sink) {
+ PARQUET_THROW_NOT_OK(sink->Write(PARQUET_MAGIC, 4));
+ return WriteFileMetaData(file_metadata, sink);
+}
+
const SchemaDescriptor* ParquetFileWriter::schema() const { return
contents_->schema(); }
const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h
index cd512cf..cdc787f 100644
--- a/cpp/src/parquet/file_writer.h
+++ b/cpp/src/parquet/file_writer.h
@@ -113,6 +113,10 @@ PARQUET_EXPORT
void WriteFileMetaData(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);
+PARQUET_EXPORT
+void WriteMetaDataFile(const FileMetaData& file_metadata,
+ ::arrow::io::OutputStream* sink);
+
class PARQUET_EXPORT ParquetFileWriter {
public:
// Forward declare a virtual class 'Contents' to aid dependency injection
and more
diff --git a/cpp/src/parquet/metadata-test.cc b/cpp/src/parquet/metadata-test.cc
index 8b6ebc1..fe796c4 100644
--- a/cpp/src/parquet/metadata-test.cc
+++ b/cpp/src/parquet/metadata-test.cc
@@ -28,35 +28,11 @@ namespace parquet {
namespace metadata {
-TEST(Metadata, TestBuildAccess) {
- parquet::schema::NodeVector fields;
- parquet::schema::NodePtr root;
- parquet::SchemaDescriptor schema;
-
- WriterProperties::Builder prop_builder;
-
- std::shared_ptr<WriterProperties> props =
- prop_builder.version(ParquetVersion::PARQUET_2_0)->build();
-
- fields.push_back(parquet::schema::Int32("int_col", Repetition::REQUIRED));
- fields.push_back(parquet::schema::Float("float_col", Repetition::REQUIRED));
- root = parquet::schema::GroupNode::Make("schema", Repetition::REPEATED,
fields);
- schema.Init(root);
-
- int64_t nrows = 1000;
- int32_t int_min = 100, int_max = 200;
- EncodedStatistics stats_int;
- stats_int.set_null_count(0)
- .set_distinct_count(nrows)
- .set_min(std::string(reinterpret_cast<const char*>(&int_min), 4))
- .set_max(std::string(reinterpret_cast<const char*>(&int_max), 4));
- EncodedStatistics stats_float;
- float float_min = 100.100f, float_max = 200.200f;
- stats_float.set_null_count(0)
- .set_distinct_count(nrows)
- .set_min(std::string(reinterpret_cast<const char*>(&float_min), 4))
- .set_max(std::string(reinterpret_cast<const char*>(&float_max), 4));
-
+// Helper function for generating table metadata
+std::unique_ptr<parquet::FileMetaData> GenerateTableMetaData(
+ const parquet::SchemaDescriptor& schema,
+ const std::shared_ptr<WriterProperties>& props, const int64_t& nrows,
+ EncodedStatistics stats_int, EncodedStatistics stats_float) {
auto f_builder = FileMetaDataBuilder::Make(&schema, props);
auto rg1_builder = f_builder->AppendRowGroup();
@@ -88,8 +64,41 @@ TEST(Metadata, TestBuildAccess) {
rg2_builder->set_num_rows(nrows / 2);
rg2_builder->Finish(1024);
- // Read the metadata
- auto f_accessor = f_builder->Finish();
+ // Return the metadata accessor
+ return f_builder->Finish();
+}
+
+TEST(Metadata, TestBuildAccess) {
+ parquet::schema::NodeVector fields;
+ parquet::schema::NodePtr root;
+ parquet::SchemaDescriptor schema;
+
+ WriterProperties::Builder prop_builder;
+
+ std::shared_ptr<WriterProperties> props =
+ prop_builder.version(ParquetVersion::PARQUET_2_0)->build();
+
+ fields.push_back(parquet::schema::Int32("int_col", Repetition::REQUIRED));
+ fields.push_back(parquet::schema::Float("float_col", Repetition::REQUIRED));
+ root = parquet::schema::GroupNode::Make("schema", Repetition::REPEATED,
fields);
+ schema.Init(root);
+
+ int64_t nrows = 1000;
+ int32_t int_min = 100, int_max = 200;
+ EncodedStatistics stats_int;
+ stats_int.set_null_count(0)
+ .set_distinct_count(nrows)
+ .set_min(std::string(reinterpret_cast<const char*>(&int_min), 4))
+ .set_max(std::string(reinterpret_cast<const char*>(&int_max), 4));
+ EncodedStatistics stats_float;
+ float float_min = 100.100f, float_max = 200.200f;
+ stats_float.set_null_count(0)
+ .set_distinct_count(nrows)
+ .set_min(std::string(reinterpret_cast<const char*>(&float_min), 4))
+ .set_max(std::string(reinterpret_cast<const char*>(&float_max), 4));
+
+ // Generate the metadata
+ auto f_accessor = GenerateTableMetaData(schema, props, nrows, stats_int,
stats_float);
// file metadata
ASSERT_EQ(nrows, f_accessor->num_rows());
@@ -168,6 +177,16 @@ TEST(Metadata, TestBuildAccess) {
ASSERT_TRUE(rg2_column1->file_path().empty());
f_accessor->set_file_path("/foo/bar/bar.parquet");
ASSERT_EQ("/foo/bar/bar.parquet", rg2_column1->file_path());
+
+ // Test AppendRowGroups
+ auto f_accessor_2 = GenerateTableMetaData(schema, props, nrows, stats_int,
stats_float);
+ f_accessor->AppendRowGroups(*f_accessor_2);
+ ASSERT_EQ(4, f_accessor->num_row_groups());
+ ASSERT_EQ(nrows * 2, f_accessor->num_rows());
+ ASSERT_LE(0, static_cast<int>(f_accessor->size()));
+ ASSERT_EQ(ParquetVersion::PARQUET_2_0, f_accessor->version());
+ ASSERT_EQ(DEFAULT_CREATED_BY, f_accessor->created_by());
+ ASSERT_EQ(3, f_accessor->num_schema_elements());
}
TEST(Metadata, TestV1Version) {
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index b5266e2..4651105 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -395,6 +395,20 @@ class FileMetaData::FileMetaDataImpl {
}
}
+ format::RowGroup& row_group(int i) {
+ DCHECK_LT(i, num_row_groups());
+ return metadata_->row_groups[i];
+ }
+
+ void AppendRowGroups(const std::unique_ptr<FileMetaDataImpl>& other) {
+ format::RowGroup other_rg;
+ for (int i = 0; i < other->num_row_groups(); i++) {
+ other_rg = other->row_group(i);
+ metadata_->row_groups.push_back(other_rg);
+ metadata_->num_rows += other_rg.num_rows;
+ }
+ }
+
private:
friend FileMetaDataBuilder;
uint32_t metadata_len_;
@@ -494,6 +508,10 @@ std::shared_ptr<const KeyValueMetadata>
FileMetaData::key_value_metadata() const
void FileMetaData::set_file_path(const std::string& path) {
impl_->set_file_path(path); }
+void FileMetaData::AppendRowGroups(const FileMetaData& other) {
+ impl_->AppendRowGroups(other.impl_);
+}
+
void FileMetaData::WriteTo(::arrow::io::OutputStream* dst) const {
return impl_->WriteTo(dst);
}
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 4a7ae44..b3a5f7b 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -194,6 +194,9 @@ class PARQUET_EXPORT FileMetaData {
// Set file_path ColumnChunk fields to a particular value
void set_file_path(const std::string& path);
+ // Merge row-group metadata from "other" FileMetaData object
+ void AppendRowGroups(const FileMetaData& other);
+
private:
friend FileMetaDataBuilder;
explicit FileMetaData(const void* serialized_metadata, uint32_t*
metadata_len);
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index d387824..ade3fc9 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -225,6 +225,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet"
nogil:
int num_schema_elements()
void set_file_path(const c_string& path)
+ void AppendRowGroups(const CFileMetaData& other)
unique_ptr[CRowGroupMetaData] RowGroup(int i)
const SchemaDescriptor* schema()
@@ -332,3 +333,7 @@ cdef extern from "parquet/arrow/writer.h" namespace
"parquet::arrow" nogil:
Builder* disallow_truncated_timestamps()
shared_ptr[ArrowWriterProperties] build()
c_bool support_deprecated_int96_timestamps()
+
+ CStatus WriteMetaDataFile(
+ const CFileMetaData& file_metadata,
+ const OutputStream* sink)
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index e464c4b..c77949f 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -536,6 +536,36 @@ cdef class FileMetaData:
c_string c_path = tobytes(path)
self._metadata.set_file_path(c_path)
+ def append_row_groups(self, FileMetaData other):
+ """
+ Append row groups of other FileMetaData object
+ """
+ cdef shared_ptr[CFileMetaData] c_metadata
+
+ c_metadata = other.sp_metadata
+ self._metadata.AppendRowGroups(deref(c_metadata))
+
+ def write_metadata_file(self, where):
+ """
+ Write the metadata object to a metadata-only file
+ """
+ cdef:
+ shared_ptr[OutputStream] sink
+ c_string c_where
+
+ try:
+ where = _stringify_path(where)
+ except TypeError:
+ get_writer(where, &sink)
+ else:
+ c_where = tobytes(where)
+ with nogil:
+ check_status(FileOutputStream.Open(c_where, &sink))
+
+ with nogil:
+ check_status(
+ WriteMetaDataFile(deref(self._metadata), sink.get()))
+
cdef class ParquetSchema:
cdef:
diff --git a/python/pyarrow/tests/test_parquet.py
b/python/pyarrow/tests/test_parquet.py
index a97e885..0e6d636 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -2700,3 +2700,45 @@ def test_parquet_file_too_small(tempdir):
with open(path, 'wb') as f:
f.write(b'ffff')
pq.read_table(path)
+
+
+def test_multi_dataset_metadata(tempdir):
+ filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"]
+ metapath = str(tempdir / "_metadata")
+
+ # create a test dataset
+ df = pd.DataFrame({
+ 'one': [1, 2, 3],
+ 'two': [-1, -2, -3],
+ 'three': [[1, 2], [2, 3], [3, 4]],
+ })
+ table = pa.Table.from_pandas(df)
+
+ # write dataset twice and collect/merge metadata
+ _meta = None
+ for filename in filenames:
+ meta = []
+ pq.write_table(table, str(tempdir / filename),
+ metadata_collector=meta)
+ meta[0].set_file_path(filename)
+ if _meta is None:
+ _meta = meta[0]
+ else:
+ _meta.append_row_groups(meta[0])
+
+ # Write merged metadata-only file
+ with open(metapath, "wb") as f:
+ _meta.write_metadata_file(f)
+
+ # Read back the metadata
+ meta = pq.read_metadata(metapath)
+ md = meta.to_dict()
+ _md = _meta.to_dict()
+ for key in _md:
+ if key != 'serialized_size':
+ assert _md[key] == md[key]
+ assert _md['num_columns'] == 3
+ assert _md['num_rows'] == 6
+ assert _md['num_row_groups'] == 2
+ assert _md['serialized_size'] == 0
+ assert md['serialized_size'] > 0