This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 51aa748  feat: write manifest avro metadata (#261)
51aa748 is described below

commit 51aa74801c3ce206157a118f19dd365109142add
Author: dongxiao <[email protected]>
AuthorDate: Fri Oct 17 22:03:22 2025 +0800

    feat: write manifest avro metadata (#261)
---
 src/iceberg/avro/avro_writer.cc      | 10 ++++++-
 src/iceberg/json_internal.cc         | 12 +++++++++
 src/iceberg/json_internal.h          | 24 +++++++++++++++++
 src/iceberg/manifest_adapter.h       |  3 +++
 src/iceberg/manifest_writer.cc       | 52 +++++++++++++++++++-----------------
 src/iceberg/test/metadata_io_test.cc |  3 +--
 src/iceberg/v1_metadata.cc           | 10 +++----
 src/iceberg/v2_metadata.cc           | 11 ++++----
 src/iceberg/v3_metadata.cc           | 10 +++----
 9 files changed, 92 insertions(+), 43 deletions(-)

diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index ded0d48..e76149b 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -68,8 +68,16 @@ class AvroWriter::Impl {
     ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
                             CreateOutputStream(options, kDefaultBufferSize));
     arrow_output_stream_ = output_stream->arrow_output_stream();
+    std::map<std::string, std::vector<uint8_t>> metadata;
+    for (const auto& [key, value] : options.properties) {
+      std::vector<uint8_t> vec;
+      vec.reserve(value.size());
+      vec.assign(value.begin(), value.end());
+      metadata.emplace(key, std::move(vec));
+    }
     writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
-        std::move(output_stream), *avro_schema_);
+        std::move(output_stream), *avro_schema_, 16 * 1024 /*syncInterval*/,
+        ::avro::NULL_CODEC /*codec*/, metadata);
     datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
     ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
     return {};
diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc
index 6adf5ab..0ad5461 100644
--- a/src/iceberg/json_internal.cc
+++ b/src/iceberg/json_internal.cc
@@ -309,6 +309,10 @@ nlohmann::json ToJson(const Schema& schema) {
   return json;
 }
 
+Result<std::string> ToJsonString(const Schema& schema) {
+  return ToJsonString(ToJson(schema));
+}
+
 nlohmann::json ToJson(const SnapshotRef& ref) {
   nlohmann::json json;
   json[kSnapshotId] = ref.snapshot_id;
@@ -490,6 +494,10 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec) 
{
   return json;
 }
 
+Result<std::string> ToJsonString(const PartitionSpec& partition_spec) {
+  return ToJsonString(ToJson(partition_spec));
+}
+
 Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
     const nlohmann::json& json, bool allow_field_id_missing) {
   ICEBERG_ASSIGN_OR_RAISE(auto source_id, GetJsonValue<int32_t>(json, 
kSourceId));
@@ -785,6 +793,10 @@ nlohmann::json ToJson(const TableMetadata& table_metadata) 
{
   return json;
 }
 
+Result<std::string> ToJsonString(const TableMetadata& table_metadata) {
+  return ToJsonString(ToJson(table_metadata));
+}
+
 namespace {
 
 /// \brief Parse the schemas from the JSON object.
diff --git a/src/iceberg/json_internal.h b/src/iceberg/json_internal.h
index 7d13459..d5eb5bc 100644
--- a/src/iceberg/json_internal.h
+++ b/src/iceberg/json_internal.h
@@ -80,6 +80,12 @@ ICEBERG_EXPORT Result<std::unique_ptr<SortOrder>> 
SortOrderFromJson(
 /// \return The JSON representation of the schema.
 ICEBERG_EXPORT nlohmann::json ToJson(const Schema& schema);
 
+/// \brief Convert an Iceberg Schema to JSON.
+///
+/// \param[in] schema The Iceberg schema to convert.
+/// \return The JSON string of the schema.
+ICEBERG_EXPORT Result<std::string> ToJsonString(const Schema& schema);
+
 /// \brief Convert JSON to an Iceberg Schema.
 ///
 /// \param[in] json The JSON representation of the schema.
@@ -148,6 +154,18 @@ ICEBERG_EXPORT Result<std::unique_ptr<PartitionField>> 
PartitionFieldFromJson(
 /// array.
 ICEBERG_EXPORT nlohmann::json ToJson(const PartitionSpec& partition_spec);
 
+/// \brief Serializes a `PartitionSpec` object to JSON.
+///
+/// This function converts a `PartitionSpec` object into a JSON representation.
+/// The resulting JSON includes the spec ID and a list of `PartitionField` 
objects.
+/// Each `PartitionField` is serialized as described in the 
`ToJson(PartitionField)`
+/// function.
+///
+/// \param partition_spec The `PartitionSpec` object to be serialized.
+/// \return A JSON string of the `PartitionSpec` with its order ID and fields
+/// array.
+ICEBERG_EXPORT Result<std::string> ToJsonString(const PartitionSpec& 
partition_spec);
+
 /// \brief Deserializes a JSON object into a `PartitionSpec` object.
 ///
 /// This function parses the provided JSON and creates a `PartitionSpec` 
object.
@@ -246,6 +264,12 @@ ICEBERG_EXPORT Result<MetadataLogEntry> 
MetadataLogEntryFromJson(
 /// \return A JSON object representing the `TableMetadata`.
 ICEBERG_EXPORT nlohmann::json ToJson(const TableMetadata& table_metadata);
 
+/// \brief Serializes a `TableMetadata` object to JSON.
+///
+/// \param table_metadata The `TableMetadata` object to be serialized.
+/// \return A JSON string of the `TableMetadata`.
+ICEBERG_EXPORT Result<std::string> ToJsonString(const TableMetadata& 
table_metadata);
+
 /// \brief Deserializes a JSON object into a `TableMetadata` object.
 ///
 /// \param json The JSON object representing a `TableMetadata`.
diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h
index 7c2a8bb..50f3b0c 100644
--- a/src/iceberg/manifest_adapter.h
+++ b/src/iceberg/manifest_adapter.h
@@ -44,6 +44,9 @@ class ICEBERG_EXPORT ManifestAdapter {
   Status StartAppending();
   Result<ArrowArray*> FinishAppending();
   int64_t size() const { return size_; }
+  const std::unordered_map<std::string, std::string>& metadata() const {
+    return metadata_;
+  }
 
  protected:
   ArrowArray array_;
diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc
index 7bdfee7..a27fb45 100644
--- a/src/iceberg/manifest_writer.cc
+++ b/src/iceberg/manifest_writer.cc
@@ -53,14 +53,16 @@ Status ManifestWriter::Close() {
   return writer_->Close();
 }
 
-Result<std::unique_ptr<Writer>> OpenFileWriter(std::string_view location,
-                                               std::shared_ptr<Schema> schema,
-                                               std::shared_ptr<FileIO> 
file_io) {
+Result<std::unique_ptr<Writer>> OpenFileWriter(
+    std::string_view location, std::shared_ptr<Schema> schema,
+    std::shared_ptr<FileIO> file_io,
+    std::unordered_map<std::string, std::string> properties) {
   ICEBERG_ASSIGN_OR_RAISE(
-      auto writer,
-      WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = 
std::string(location),
-                                                          .schema = 
std::move(schema),
-                                                          .io = 
std::move(file_io)}));
+      auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
+                                               {.path = std::string(location),
+                                                .schema = std::move(schema),
+                                                .io = std::move(file_io),
+                                                .properties = 
std::move(properties)}));
   return writer;
 }
 
@@ -73,9 +75,9 @@ Result<std::unique_ptr<ManifestWriter>> 
ManifestWriter::MakeV1Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(
-      auto writer,
-      OpenFileWriter(manifest_location, std::move(schema), 
std::move(file_io)));
+  ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                          OpenFileWriter(manifest_location, std::move(schema),
+                                         std::move(file_io), 
adapter->metadata()));
   return std::make_unique<ManifestWriter>(std::move(writer), 
std::move(adapter));
 }
 
@@ -88,9 +90,9 @@ Result<std::unique_ptr<ManifestWriter>> 
ManifestWriter::MakeV2Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(
-      auto writer,
-      OpenFileWriter(manifest_location, std::move(schema), 
std::move(file_io)));
+  ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                          OpenFileWriter(manifest_location, std::move(schema),
+                                         std::move(file_io), 
adapter->metadata()));
   return std::make_unique<ManifestWriter>(std::move(writer), 
std::move(adapter));
 }
 
@@ -104,9 +106,9 @@ Result<std::unique_ptr<ManifestWriter>> 
ManifestWriter::MakeV3Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(
-      auto writer,
-      OpenFileWriter(manifest_location, std::move(schema), 
std::move(file_io)));
+  ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                          OpenFileWriter(manifest_location, std::move(schema),
+                                         std::move(file_io), 
adapter->metadata()));
   return std::make_unique<ManifestWriter>(std::move(writer), 
std::move(adapter));
 }
 
@@ -142,9 +144,9 @@ Result<std::unique_ptr<ManifestListWriter>> 
ManifestListWriter::MakeV1Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(
-      auto writer,
-      OpenFileWriter(manifest_list_location, std::move(schema), 
std::move(file_io)));
+  ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                          OpenFileWriter(manifest_list_location, 
std::move(schema),
+                                         std::move(file_io), 
adapter->metadata()));
   return std::make_unique<ManifestListWriter>(std::move(writer), 
std::move(adapter));
 }
 
@@ -158,9 +160,9 @@ Result<std::unique_ptr<ManifestListWriter>> 
ManifestListWriter::MakeV2Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(
-      auto writer,
-      OpenFileWriter(manifest_list_location, std::move(schema), 
std::move(file_io)));
+  ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                          OpenFileWriter(manifest_list_location, 
std::move(schema),
+                                         std::move(file_io), 
adapter->metadata()));
 
   return std::make_unique<ManifestListWriter>(std::move(writer), 
std::move(adapter));
 }
@@ -175,9 +177,9 @@ Result<std::unique_ptr<ManifestListWriter>> 
ManifestListWriter::MakeV3Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(
-      auto writer,
-      OpenFileWriter(manifest_list_location, std::move(schema), 
std::move(file_io)));
+  ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                          OpenFileWriter(manifest_list_location, 
std::move(schema),
+                                         std::move(file_io), 
adapter->metadata()));
   return std::make_unique<ManifestListWriter>(std::move(writer), 
std::move(adapter));
 }
 
diff --git a/src/iceberg/test/metadata_io_test.cc 
b/src/iceberg/test/metadata_io_test.cc
index 1590af4..aff1e9a 100644
--- a/src/iceberg/test/metadata_io_test.cc
+++ b/src/iceberg/test/metadata_io_test.cc
@@ -90,8 +90,7 @@ TEST_F(MetadataIOTest, ReadWriteMetadata) {
 TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) {
   TableMetadata metadata = PrepareMetadata();
 
-  auto json = ToJson(metadata);
-  auto ret = ToJsonString(json);
+  auto ret = ToJsonString(metadata);
   ASSERT_TRUE(ret.has_value());
   auto json_string = ret.value();
 
diff --git a/src/iceberg/v1_metadata.cc b/src/iceberg/v1_metadata.cc
index ba381f8..52c52cd 100644
--- a/src/iceberg/v1_metadata.cc
+++ b/src/iceberg/v1_metadata.cc
@@ -19,6 +19,7 @@
 
 #include "iceberg/v1_metadata.h"
 
+#include "iceberg/json_internal.h"
 #include "iceberg/manifest_entry.h"
 #include "iceberg/manifest_list.h"
 #include "iceberg/schema.h"
@@ -47,15 +48,14 @@ Status ManifestEntryAdapterV1::Init() {
       DataFile::kSplitOffsets.field_id(),
       DataFile::kSortOrderId.field_id(),
   };
-  // TODO(xiao.dong) schema to json
-  metadata_["schema"] = "{}";
-  // TODO(xiao.dong) partition spec to json
-  metadata_["partition-spec"] = "{}";
+  ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
+  ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
   if (partition_spec_ != nullptr) {
+    ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], 
ToJsonString(*partition_spec_));
     metadata_["partition-spec-id"] = 
std::to_string(partition_spec_->spec_id());
   }
   metadata_["format-version"] = "1";
-  return InitSchema(kManifestEntryFieldIds);
+  return {};
 }
 
 Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) {
diff --git a/src/iceberg/v2_metadata.cc b/src/iceberg/v2_metadata.cc
index e2bbb91..c1407b1 100644
--- a/src/iceberg/v2_metadata.cc
+++ b/src/iceberg/v2_metadata.cc
@@ -19,9 +19,11 @@
 
 #include "iceberg/v2_metadata.h"
 
+#include "iceberg/json_internal.h"
 #include "iceberg/manifest_entry.h"
 #include "iceberg/manifest_list.h"
 #include "iceberg/schema.h"
+#include "iceberg/util/macros.h"
 
 namespace iceberg {
 
@@ -50,16 +52,15 @@ Status ManifestEntryAdapterV2::Init() {
       DataFile::kSortOrderId.field_id(),
       DataFile::kReferencedDataFile.field_id(),
   };
-  // TODO(xiao.dong) schema to json
-  metadata_["schema"] = "{}";
-  // TODO(xiao.dong) partition spec to json
-  metadata_["partition-spec"] = "{}";
+  ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
+  ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
   if (partition_spec_ != nullptr) {
+    ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], 
ToJsonString(*partition_spec_));
     metadata_["partition-spec-id"] = 
std::to_string(partition_spec_->spec_id());
   }
   metadata_["format-version"] = "2";
   metadata_["content"] = "data";
-  return InitSchema(kManifestEntryFieldIds);
+  return {};
 }
 
 Status ManifestEntryAdapterV2::Append(const ManifestEntry& entry) {
diff --git a/src/iceberg/v3_metadata.cc b/src/iceberg/v3_metadata.cc
index e460598..61474f6 100644
--- a/src/iceberg/v3_metadata.cc
+++ b/src/iceberg/v3_metadata.cc
@@ -19,6 +19,7 @@
 
 #include "iceberg/v3_metadata.h"
 
+#include "iceberg/json_internal.h"
 #include "iceberg/manifest_entry.h"
 #include "iceberg/manifest_list.h"
 #include "iceberg/schema.h"
@@ -54,16 +55,15 @@ Status ManifestEntryAdapterV3::Init() {
       DataFile::kContentOffset.field_id(),
       DataFile::kContentSize.field_id(),
   };
-  // TODO(xiao.dong) schema to json
-  metadata_["schema"] = "{}";
-  // TODO(xiao.dong) partition spec to json
-  metadata_["partition-spec"] = "{}";
+  ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
+  ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
   if (partition_spec_ != nullptr) {
+    ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"], 
ToJsonString(*partition_spec_));
     metadata_["partition-spec-id"] = 
std::to_string(partition_spec_->spec_id());
   }
   metadata_["format-version"] = "3";
   metadata_["content"] = "data";
-  return InitSchema(kManifestEntryFieldIds);
+  return {};
 }
 
 Status ManifestEntryAdapterV3::Append(const ManifestEntry& entry) {

Reply via email to