This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 997226a  ARROW-1983: [C++][Parquet] Add AppendRowGroups and 
WriteMetaDataFile methods
997226a is described below

commit 997226a9263430bc0422189180bc2551aed1f63d
Author: rjzamora <[email protected]>
AuthorDate: Thu Jun 6 16:36:09 2019 -0500

    ARROW-1983: [C++][Parquet] Add AppendRowGroups and WriteMetaDataFile methods
    
    @wesm  @jorisvandenbossche - This is a rough RFC for one possible solution 
to the write phase of 
[ARROW-1983](https://issues.apache.org/jira/browse/ARROW-1983).  Summary:  I 
added new `AppendRowGroups` and `WriteMetaDataFile` methods to the 
`FileMetData` class (along with python wrappers).  I also added a 
`get_row_group` method to `FileMetaData::FileMetaDataImpl` to make the append 
function easier to write (not sure if this is really necessary).
    
    Feel free to recommend changes and/or complete rewrites.  I realize that an 
ideal solution might be to modify the existing `ParquetWriter`/`write_metadata` 
funtionality to accept an existing `FileMetaData` object as input, so I will 
wait to add tests until I know this approach seems reasonable.
    
    Author: rjzamora <[email protected]>
    
    Closes #4405 from rjzamora/metadata-append and squashes the following 
commits:
    
    b4ffc9feb <rjzamora> fixing clang-format style issues
    5999609cd <rjzamora> improve pytesting and remove use of shared_ptr in API
    4085e0efa <rjzamora> adding WriteMetaDataFile to FileMetaData API
    1973b61ee <rjzamora> easy code-review fixes (typos and renaming)
    3cc44231d <rjzamora> more pytest linting
    873ba25fb <rjzamora> fixing a few failed checks
    b3dd4ddb1 <rjzamora> sync with upstream master
    9a01fcbbe <rjzamora> testing read_metadata and lint
    d0f86298e <rjzamora> using write_table in test
    695e9a56c <rjzamora> fixing some linting and warnings
    2544da17e <rjzamora> add python test for multi-dataset metadata
    b7eb85977 <rjzamora> adding test for AppendRowGroups
    7a4c76cfb <rjzamora> using existing WriteFileMetaData function instead of 
new WriteMetaDataFile
    ffd343530 <rjzamora> minor cleanup
    1486f8da0 <rjzamora> add WriteMetaDataFile function
    54f3ea00a <rjzamora> adding method to append row-group metadata to an 
existing FileMetadata object
---
 cpp/src/parquet/arrow/writer.cc      |  6 +++
 cpp/src/parquet/arrow/writer.h       |  5 +++
 cpp/src/parquet/file_writer.cc       |  5 +++
 cpp/src/parquet/file_writer.h        |  4 ++
 cpp/src/parquet/metadata-test.cc     | 81 ++++++++++++++++++++++--------------
 cpp/src/parquet/metadata.cc          | 18 ++++++++
 cpp/src/parquet/metadata.h           |  3 ++
 python/pyarrow/_parquet.pxd          |  5 +++
 python/pyarrow/_parquet.pyx          | 30 +++++++++++++
 python/pyarrow/tests/test_parquet.py | 42 +++++++++++++++++++
 10 files changed, 168 insertions(+), 31 deletions(-)

diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index ad7bd51..d6a9b44 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -1135,6 +1135,12 @@ Status WriteFileMetaData(const FileMetaData& 
file_metadata,
   return Status::OK();
 }
 
+Status WriteMetaDataFile(const FileMetaData& file_metadata,
+                         ::arrow::io::OutputStream* sink) {
+  PARQUET_CATCH_NOT_OK(::parquet::WriteMetaDataFile(file_metadata, sink));
+  return Status::OK();
+}
+
 Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
   RETURN_NOT_OK(table.Validate());
 
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 97ed0f7..8014e1a 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -181,6 +181,11 @@ PARQUET_EXPORT
 ::arrow::Status WriteFileMetaData(const FileMetaData& file_metadata,
                                   ::arrow::io::OutputStream* sink);
 
+/// \brief Write metadata-only Parquet file to indicated Arrow OutputStream
+PARQUET_EXPORT
+::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata,
+                                  ::arrow::io::OutputStream* sink);
+
 /**
  * Write a Table to Parquet.
  *
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index 87ab7b4..f2f42f3 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -382,6 +382,11 @@ void WriteFileMetaData(const FileMetaData& file_metadata, 
OutputStream* sink) {
   return WriteFileMetaData(file_metadata, &wrapper);
 }
 
+void WriteMetaDataFile(const FileMetaData& file_metadata, ArrowOutputStream* 
sink) {
+  PARQUET_THROW_NOT_OK(sink->Write(PARQUET_MAGIC, 4));
+  return WriteFileMetaData(file_metadata, sink);
+}
+
 const SchemaDescriptor* ParquetFileWriter::schema() const { return 
contents_->schema(); }
 
 const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
diff --git a/cpp/src/parquet/file_writer.h b/cpp/src/parquet/file_writer.h
index cd512cf..cdc787f 100644
--- a/cpp/src/parquet/file_writer.h
+++ b/cpp/src/parquet/file_writer.h
@@ -113,6 +113,10 @@ PARQUET_EXPORT
 void WriteFileMetaData(const FileMetaData& file_metadata,
                        ::arrow::io::OutputStream* sink);
 
+PARQUET_EXPORT
+void WriteMetaDataFile(const FileMetaData& file_metadata,
+                       ::arrow::io::OutputStream* sink);
+
 class PARQUET_EXPORT ParquetFileWriter {
  public:
   // Forward declare a virtual class 'Contents' to aid dependency injection 
and more
diff --git a/cpp/src/parquet/metadata-test.cc b/cpp/src/parquet/metadata-test.cc
index 8b6ebc1..fe796c4 100644
--- a/cpp/src/parquet/metadata-test.cc
+++ b/cpp/src/parquet/metadata-test.cc
@@ -28,35 +28,11 @@ namespace parquet {
 
 namespace metadata {
 
-TEST(Metadata, TestBuildAccess) {
-  parquet::schema::NodeVector fields;
-  parquet::schema::NodePtr root;
-  parquet::SchemaDescriptor schema;
-
-  WriterProperties::Builder prop_builder;
-
-  std::shared_ptr<WriterProperties> props =
-      prop_builder.version(ParquetVersion::PARQUET_2_0)->build();
-
-  fields.push_back(parquet::schema::Int32("int_col", Repetition::REQUIRED));
-  fields.push_back(parquet::schema::Float("float_col", Repetition::REQUIRED));
-  root = parquet::schema::GroupNode::Make("schema", Repetition::REPEATED, 
fields);
-  schema.Init(root);
-
-  int64_t nrows = 1000;
-  int32_t int_min = 100, int_max = 200;
-  EncodedStatistics stats_int;
-  stats_int.set_null_count(0)
-      .set_distinct_count(nrows)
-      .set_min(std::string(reinterpret_cast<const char*>(&int_min), 4))
-      .set_max(std::string(reinterpret_cast<const char*>(&int_max), 4));
-  EncodedStatistics stats_float;
-  float float_min = 100.100f, float_max = 200.200f;
-  stats_float.set_null_count(0)
-      .set_distinct_count(nrows)
-      .set_min(std::string(reinterpret_cast<const char*>(&float_min), 4))
-      .set_max(std::string(reinterpret_cast<const char*>(&float_max), 4));
-
+// Helper function for generating table metadata
+std::unique_ptr<parquet::FileMetaData> GenerateTableMetaData(
+    const parquet::SchemaDescriptor& schema,
+    const std::shared_ptr<WriterProperties>& props, const int64_t& nrows,
+    EncodedStatistics stats_int, EncodedStatistics stats_float) {
   auto f_builder = FileMetaDataBuilder::Make(&schema, props);
   auto rg1_builder = f_builder->AppendRowGroup();
 
@@ -88,8 +64,41 @@ TEST(Metadata, TestBuildAccess) {
   rg2_builder->set_num_rows(nrows / 2);
   rg2_builder->Finish(1024);
 
-  // Read the metadata
-  auto f_accessor = f_builder->Finish();
+  // Return the metadata accessor
+  return f_builder->Finish();
+}
+
+TEST(Metadata, TestBuildAccess) {
+  parquet::schema::NodeVector fields;
+  parquet::schema::NodePtr root;
+  parquet::SchemaDescriptor schema;
+
+  WriterProperties::Builder prop_builder;
+
+  std::shared_ptr<WriterProperties> props =
+      prop_builder.version(ParquetVersion::PARQUET_2_0)->build();
+
+  fields.push_back(parquet::schema::Int32("int_col", Repetition::REQUIRED));
+  fields.push_back(parquet::schema::Float("float_col", Repetition::REQUIRED));
+  root = parquet::schema::GroupNode::Make("schema", Repetition::REPEATED, 
fields);
+  schema.Init(root);
+
+  int64_t nrows = 1000;
+  int32_t int_min = 100, int_max = 200;
+  EncodedStatistics stats_int;
+  stats_int.set_null_count(0)
+      .set_distinct_count(nrows)
+      .set_min(std::string(reinterpret_cast<const char*>(&int_min), 4))
+      .set_max(std::string(reinterpret_cast<const char*>(&int_max), 4));
+  EncodedStatistics stats_float;
+  float float_min = 100.100f, float_max = 200.200f;
+  stats_float.set_null_count(0)
+      .set_distinct_count(nrows)
+      .set_min(std::string(reinterpret_cast<const char*>(&float_min), 4))
+      .set_max(std::string(reinterpret_cast<const char*>(&float_max), 4));
+
+  // Generate the metadata
+  auto f_accessor = GenerateTableMetaData(schema, props, nrows, stats_int, 
stats_float);
 
   // file metadata
   ASSERT_EQ(nrows, f_accessor->num_rows());
@@ -168,6 +177,16 @@ TEST(Metadata, TestBuildAccess) {
   ASSERT_TRUE(rg2_column1->file_path().empty());
   f_accessor->set_file_path("/foo/bar/bar.parquet");
   ASSERT_EQ("/foo/bar/bar.parquet", rg2_column1->file_path());
+
+  // Test AppendRowGroups
+  auto f_accessor_2 = GenerateTableMetaData(schema, props, nrows, stats_int, 
stats_float);
+  f_accessor->AppendRowGroups(*f_accessor_2);
+  ASSERT_EQ(4, f_accessor->num_row_groups());
+  ASSERT_EQ(nrows * 2, f_accessor->num_rows());
+  ASSERT_LE(0, static_cast<int>(f_accessor->size()));
+  ASSERT_EQ(ParquetVersion::PARQUET_2_0, f_accessor->version());
+  ASSERT_EQ(DEFAULT_CREATED_BY, f_accessor->created_by());
+  ASSERT_EQ(3, f_accessor->num_schema_elements());
 }
 
 TEST(Metadata, TestV1Version) {
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index b5266e2..4651105 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -395,6 +395,20 @@ class FileMetaData::FileMetaDataImpl {
     }
   }
 
+  format::RowGroup& row_group(int i) {
+    DCHECK_LT(i, num_row_groups());
+    return metadata_->row_groups[i];
+  }
+
+  void AppendRowGroups(const std::unique_ptr<FileMetaDataImpl>& other) {
+    format::RowGroup other_rg;
+    for (int i = 0; i < other->num_row_groups(); i++) {
+      other_rg = other->row_group(i);
+      metadata_->row_groups.push_back(other_rg);
+      metadata_->num_rows += other_rg.num_rows;
+    }
+  }
+
  private:
   friend FileMetaDataBuilder;
   uint32_t metadata_len_;
@@ -494,6 +508,10 @@ std::shared_ptr<const KeyValueMetadata> 
FileMetaData::key_value_metadata() const
 
 void FileMetaData::set_file_path(const std::string& path) { 
impl_->set_file_path(path); }
 
+void FileMetaData::AppendRowGroups(const FileMetaData& other) {
+  impl_->AppendRowGroups(other.impl_);
+}
+
 void FileMetaData::WriteTo(::arrow::io::OutputStream* dst) const {
   return impl_->WriteTo(dst);
 }
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 4a7ae44..b3a5f7b 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -194,6 +194,9 @@ class PARQUET_EXPORT FileMetaData {
   // Set file_path ColumnChunk fields to a particular value
   void set_file_path(const std::string& path);
 
+  // Merge row-group metadata from "other" FileMetaData object
+  void AppendRowGroups(const FileMetaData& other);
+
  private:
   friend FileMetaDataBuilder;
   explicit FileMetaData(const void* serialized_metadata, uint32_t* 
metadata_len);
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index d387824..ade3fc9 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -225,6 +225,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" 
nogil:
         int num_schema_elements()
 
         void set_file_path(const c_string& path)
+        void AppendRowGroups(const CFileMetaData& other)
 
         unique_ptr[CRowGroupMetaData] RowGroup(int i)
         const SchemaDescriptor* schema()
@@ -332,3 +333,7 @@ cdef extern from "parquet/arrow/writer.h" namespace 
"parquet::arrow" nogil:
             Builder* disallow_truncated_timestamps()
             shared_ptr[ArrowWriterProperties] build()
         c_bool support_deprecated_int96_timestamps()
+
+    CStatus WriteMetaDataFile(
+        const CFileMetaData& file_metadata,
+        const OutputStream* sink)
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index e464c4b..c77949f 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -536,6 +536,36 @@ cdef class FileMetaData:
             c_string c_path = tobytes(path)
         self._metadata.set_file_path(c_path)
 
+    def append_row_groups(self, FileMetaData other):
+        """
+        Append row groups of other FileMetaData object
+        """
+        cdef shared_ptr[CFileMetaData] c_metadata
+
+        c_metadata = other.sp_metadata
+        self._metadata.AppendRowGroups(deref(c_metadata))
+
+    def write_metadata_file(self, where):
+        """
+        Write the metadata object to a metadata-only file
+        """
+        cdef:
+            shared_ptr[OutputStream] sink
+            c_string c_where
+
+        try:
+            where = _stringify_path(where)
+        except TypeError:
+            get_writer(where, &sink)
+        else:
+            c_where = tobytes(where)
+            with nogil:
+                check_status(FileOutputStream.Open(c_where, &sink))
+
+        with nogil:
+            check_status(
+                WriteMetaDataFile(deref(self._metadata), sink.get()))
+
 
 cdef class ParquetSchema:
     cdef:
diff --git a/python/pyarrow/tests/test_parquet.py 
b/python/pyarrow/tests/test_parquet.py
index a97e885..0e6d636 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -2700,3 +2700,45 @@ def test_parquet_file_too_small(tempdir):
         with open(path, 'wb') as f:
             f.write(b'ffff')
         pq.read_table(path)
+
+
+def test_multi_dataset_metadata(tempdir):
+    filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"]
+    metapath = str(tempdir / "_metadata")
+
+    # create a test dataset
+    df = pd.DataFrame({
+        'one': [1, 2, 3],
+        'two': [-1, -2, -3],
+        'three': [[1, 2], [2, 3], [3, 4]],
+        })
+    table = pa.Table.from_pandas(df)
+
+    # write dataset twice and collect/merge metadata
+    _meta = None
+    for filename in filenames:
+        meta = []
+        pq.write_table(table, str(tempdir / filename),
+                       metadata_collector=meta)
+        meta[0].set_file_path(filename)
+        if _meta is None:
+            _meta = meta[0]
+        else:
+            _meta.append_row_groups(meta[0])
+
+    # Write merged metadata-only file
+    with open(metapath, "wb") as f:
+        _meta.write_metadata_file(f)
+
+    # Read back the metadata
+    meta = pq.read_metadata(metapath)
+    md = meta.to_dict()
+    _md = _meta.to_dict()
+    for key in _md:
+        if key != 'serialized_size':
+            assert _md[key] == md[key]
+    assert _md['num_columns'] == 3
+    assert _md['num_rows'] == 6
+    assert _md['num_row_groups'] == 2
+    assert _md['serialized_size'] == 0
+    assert md['serialized_size'] > 0

Reply via email to