This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 51aa748 feat: write manifest avro metadata (#261)
51aa748 is described below
commit 51aa74801c3ce206157a118f19dd365109142add
Author: dongxiao <[email protected]>
AuthorDate: Fri Oct 17 22:03:22 2025 +0800
feat: write manifest avro metadata (#261)
---
src/iceberg/avro/avro_writer.cc | 10 ++++++-
src/iceberg/json_internal.cc | 12 +++++++++
src/iceberg/json_internal.h | 24 +++++++++++++++++
src/iceberg/manifest_adapter.h | 3 +++
src/iceberg/manifest_writer.cc | 52 +++++++++++++++++++-----------------
src/iceberg/test/metadata_io_test.cc | 3 +--
src/iceberg/v1_metadata.cc | 10 +++----
src/iceberg/v2_metadata.cc | 11 ++++----
src/iceberg/v3_metadata.cc | 10 +++----
9 files changed, 92 insertions(+), 43 deletions(-)
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index ded0d48..e76149b 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -68,8 +68,16 @@ class AvroWriter::Impl {
ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
CreateOutputStream(options, kDefaultBufferSize));
arrow_output_stream_ = output_stream->arrow_output_stream();
+ std::map<std::string, std::vector<uint8_t>> metadata;
+ for (const auto& [key, value] : options.properties) {
+ std::vector<uint8_t> vec;
+ vec.reserve(value.size());
+ vec.assign(value.begin(), value.end());
+ metadata.emplace(key, std::move(vec));
+ }
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
- std::move(output_stream), *avro_schema_);
+ std::move(output_stream), *avro_schema_, 16 * 1024 /*syncInterval*/,
+ ::avro::NULL_CODEC /*codec*/, metadata);
datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
return {};
diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc
index 6adf5ab..0ad5461 100644
--- a/src/iceberg/json_internal.cc
+++ b/src/iceberg/json_internal.cc
@@ -309,6 +309,10 @@ nlohmann::json ToJson(const Schema& schema) {
return json;
}
+Result<std::string> ToJsonString(const Schema& schema) {
+ return ToJsonString(ToJson(schema));
+}
+
nlohmann::json ToJson(const SnapshotRef& ref) {
nlohmann::json json;
json[kSnapshotId] = ref.snapshot_id;
@@ -490,6 +494,10 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec)
{
return json;
}
+Result<std::string> ToJsonString(const PartitionSpec& partition_spec) {
+ return ToJsonString(ToJson(partition_spec));
+}
+
Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
const nlohmann::json& json, bool allow_field_id_missing) {
ICEBERG_ASSIGN_OR_RAISE(auto source_id, GetJsonValue<int32_t>(json,
kSourceId));
@@ -785,6 +793,10 @@ nlohmann::json ToJson(const TableMetadata& table_metadata)
{
return json;
}
+Result<std::string> ToJsonString(const TableMetadata& table_metadata) {
+ return ToJsonString(ToJson(table_metadata));
+}
+
namespace {
/// \brief Parse the schemas from the JSON object.
diff --git a/src/iceberg/json_internal.h b/src/iceberg/json_internal.h
index 7d13459..d5eb5bc 100644
--- a/src/iceberg/json_internal.h
+++ b/src/iceberg/json_internal.h
@@ -80,6 +80,12 @@ ICEBERG_EXPORT Result<std::unique_ptr<SortOrder>>
SortOrderFromJson(
/// \return The JSON representation of the schema.
ICEBERG_EXPORT nlohmann::json ToJson(const Schema& schema);
+/// \brief Convert an Iceberg Schema to JSON.
+///
+/// \param[in] schema The Iceberg schema to convert.
+/// \return The JSON string of the schema.
+ICEBERG_EXPORT Result<std::string> ToJsonString(const Schema& schema);
+
/// \brief Convert JSON to an Iceberg Schema.
///
/// \param[in] json The JSON representation of the schema.
@@ -148,6 +154,18 @@ ICEBERG_EXPORT Result<std::unique_ptr<PartitionField>>
PartitionFieldFromJson(
/// array.
ICEBERG_EXPORT nlohmann::json ToJson(const PartitionSpec& partition_spec);
+/// \brief Serializes a `PartitionSpec` object to JSON.
+///
+/// This function converts a `PartitionSpec` object into a JSON representation.
+/// The resulting JSON includes the spec ID and a list of `PartitionField`
objects.
+/// Each `PartitionField` is serialized as described in the
`ToJson(PartitionField)`
+/// function.
+///
+/// \param partition_spec The `PartitionSpec` object to be serialized.
+/// \return A JSON string of the `PartitionSpec` with its order ID and fields
+/// array.
+ICEBERG_EXPORT Result<std::string> ToJsonString(const PartitionSpec&
partition_spec);
+
/// \brief Deserializes a JSON object into a `PartitionSpec` object.
///
/// This function parses the provided JSON and creates a `PartitionSpec`
object.
@@ -246,6 +264,12 @@ ICEBERG_EXPORT Result<MetadataLogEntry>
MetadataLogEntryFromJson(
/// \return A JSON object representing the `TableMetadata`.
ICEBERG_EXPORT nlohmann::json ToJson(const TableMetadata& table_metadata);
+/// \brief Serializes a `TableMetadata` object to JSON.
+///
+/// \param table_metadata The `TableMetadata` object to be serialized.
+/// \return A JSON string of the `TableMetadata`.
+ICEBERG_EXPORT Result<std::string> ToJsonString(const TableMetadata&
table_metadata);
+
/// \brief Deserializes a JSON object into a `TableMetadata` object.
///
/// \param json The JSON object representing a `TableMetadata`.
diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h
index 7c2a8bb..50f3b0c 100644
--- a/src/iceberg/manifest_adapter.h
+++ b/src/iceberg/manifest_adapter.h
@@ -44,6 +44,9 @@ class ICEBERG_EXPORT ManifestAdapter {
Status StartAppending();
Result<ArrowArray*> FinishAppending();
int64_t size() const { return size_; }
+ const std::unordered_map<std::string, std::string>& metadata() const {
+ return metadata_;
+ }
protected:
ArrowArray array_;
diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc
index 7bdfee7..a27fb45 100644
--- a/src/iceberg/manifest_writer.cc
+++ b/src/iceberg/manifest_writer.cc
@@ -53,14 +53,16 @@ Status ManifestWriter::Close() {
return writer_->Close();
}
-Result<std::unique_ptr<Writer>> OpenFileWriter(std::string_view location,
- std::shared_ptr<Schema> schema,
- std::shared_ptr<FileIO>
file_io) {
+Result<std::unique_ptr<Writer>> OpenFileWriter(
+ std::string_view location, std::shared_ptr<Schema> schema,
+ std::shared_ptr<FileIO> file_io,
+ std::unordered_map<std::string, std::string> properties) {
ICEBERG_ASSIGN_OR_RAISE(
- auto writer,
- WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path =
std::string(location),
- .schema =
std::move(schema),
- .io =
std::move(file_io)}));
+ auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
+ {.path = std::string(location),
+ .schema = std::move(schema),
+ .io = std::move(file_io),
+ .properties =
std::move(properties)}));
return writer;
}
@@ -73,9 +75,9 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeV1Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(
- auto writer,
- OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io)));
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ OpenFileWriter(manifest_location, std::move(schema),
+ std::move(file_io),
adapter->metadata()));
return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
}
@@ -88,9 +90,9 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeV2Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(
- auto writer,
- OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io)));
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ OpenFileWriter(manifest_location, std::move(schema),
+ std::move(file_io),
adapter->metadata()));
return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
}
@@ -104,9 +106,9 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeV3Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(
- auto writer,
- OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io)));
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ OpenFileWriter(manifest_location, std::move(schema),
+ std::move(file_io),
adapter->metadata()));
return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
}
@@ -142,9 +144,9 @@ Result<std::unique_ptr<ManifestListWriter>>
ManifestListWriter::MakeV1Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(
- auto writer,
- OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io)));
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ OpenFileWriter(manifest_list_location,
std::move(schema),
+ std::move(file_io),
adapter->metadata()));
return std::make_unique<ManifestListWriter>(std::move(writer),
std::move(adapter));
}
@@ -158,9 +160,9 @@ Result<std::unique_ptr<ManifestListWriter>>
ManifestListWriter::MakeV2Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(
- auto writer,
- OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io)));
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ OpenFileWriter(manifest_list_location,
std::move(schema),
+ std::move(file_io),
adapter->metadata()));
return std::make_unique<ManifestListWriter>(std::move(writer),
std::move(adapter));
}
@@ -175,9 +177,9 @@ Result<std::unique_ptr<ManifestListWriter>>
ManifestListWriter::MakeV3Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(
- auto writer,
- OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io)));
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ OpenFileWriter(manifest_list_location,
std::move(schema),
+ std::move(file_io),
adapter->metadata()));
return std::make_unique<ManifestListWriter>(std::move(writer),
std::move(adapter));
}
diff --git a/src/iceberg/test/metadata_io_test.cc
b/src/iceberg/test/metadata_io_test.cc
index 1590af4..aff1e9a 100644
--- a/src/iceberg/test/metadata_io_test.cc
+++ b/src/iceberg/test/metadata_io_test.cc
@@ -90,8 +90,7 @@ TEST_F(MetadataIOTest, ReadWriteMetadata) {
TEST_F(MetadataIOTest, ReadWriteCompressedMetadata) {
TableMetadata metadata = PrepareMetadata();
- auto json = ToJson(metadata);
- auto ret = ToJsonString(json);
+ auto ret = ToJsonString(metadata);
ASSERT_TRUE(ret.has_value());
auto json_string = ret.value();
diff --git a/src/iceberg/v1_metadata.cc b/src/iceberg/v1_metadata.cc
index ba381f8..52c52cd 100644
--- a/src/iceberg/v1_metadata.cc
+++ b/src/iceberg/v1_metadata.cc
@@ -19,6 +19,7 @@
#include "iceberg/v1_metadata.h"
+#include "iceberg/json_internal.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/schema.h"
@@ -47,15 +48,14 @@ Status ManifestEntryAdapterV1::Init() {
DataFile::kSplitOffsets.field_id(),
DataFile::kSortOrderId.field_id(),
};
- // TODO(xiao.dong) schema to json
- metadata_["schema"] = "{}";
- // TODO(xiao.dong) partition spec to json
- metadata_["partition-spec"] = "{}";
+ ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
+ ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
if (partition_spec_ != nullptr) {
+ ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"],
ToJsonString(*partition_spec_));
metadata_["partition-spec-id"] =
std::to_string(partition_spec_->spec_id());
}
metadata_["format-version"] = "1";
- return InitSchema(kManifestEntryFieldIds);
+ return {};
}
Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) {
diff --git a/src/iceberg/v2_metadata.cc b/src/iceberg/v2_metadata.cc
index e2bbb91..c1407b1 100644
--- a/src/iceberg/v2_metadata.cc
+++ b/src/iceberg/v2_metadata.cc
@@ -19,9 +19,11 @@
#include "iceberg/v2_metadata.h"
+#include "iceberg/json_internal.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/schema.h"
+#include "iceberg/util/macros.h"
namespace iceberg {
@@ -50,16 +52,15 @@ Status ManifestEntryAdapterV2::Init() {
DataFile::kSortOrderId.field_id(),
DataFile::kReferencedDataFile.field_id(),
};
- // TODO(xiao.dong) schema to json
- metadata_["schema"] = "{}";
- // TODO(xiao.dong) partition spec to json
- metadata_["partition-spec"] = "{}";
+ ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
+ ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
if (partition_spec_ != nullptr) {
+ ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"],
ToJsonString(*partition_spec_));
metadata_["partition-spec-id"] =
std::to_string(partition_spec_->spec_id());
}
metadata_["format-version"] = "2";
metadata_["content"] = "data";
- return InitSchema(kManifestEntryFieldIds);
+ return {};
}
Status ManifestEntryAdapterV2::Append(const ManifestEntry& entry) {
diff --git a/src/iceberg/v3_metadata.cc b/src/iceberg/v3_metadata.cc
index e460598..61474f6 100644
--- a/src/iceberg/v3_metadata.cc
+++ b/src/iceberg/v3_metadata.cc
@@ -19,6 +19,7 @@
#include "iceberg/v3_metadata.h"
+#include "iceberg/json_internal.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/schema.h"
@@ -54,16 +55,15 @@ Status ManifestEntryAdapterV3::Init() {
DataFile::kContentOffset.field_id(),
DataFile::kContentSize.field_id(),
};
- // TODO(xiao.dong) schema to json
- metadata_["schema"] = "{}";
- // TODO(xiao.dong) partition spec to json
- metadata_["partition-spec"] = "{}";
+ ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
+ ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
if (partition_spec_ != nullptr) {
+ ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"],
ToJsonString(*partition_spec_));
metadata_["partition-spec-id"] =
std::to_string(partition_spec_->spec_id());
}
metadata_["format-version"] = "3";
metadata_["content"] = "data";
- return InitSchema(kManifestEntryFieldIds);
+ return {};
}
Status ManifestEntryAdapterV3::Append(const ManifestEntry& entry) {