This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 9c74fd2  feat: add json serde for table metadata (#75)
9c74fd2 is described below

commit 9c74fd2aa3b354e5adcf15902257c077bd90ddfb
Author: Gang Wu <[email protected]>
AuthorDate: Thu Apr 17 21:59:30 2025 +0800

    feat: add json serde for table metadata (#75)
---
 src/iceberg/json_internal.cc   | 685 +++++++++++++++++++++++++++++++++++++----
 src/iceberg/json_internal.h    | 158 +++++++---
 src/iceberg/result.h           |  17 +
 src/iceberg/statistics_file.cc |  53 ++--
 src/iceberg/statistics_file.h  |  47 ++-
 src/iceberg/table_metadata.cc  |  23 +-
 src/iceberg/table_metadata.h   |  55 +++-
 src/iceberg/type_fwd.h         |  22 +-
 test/schema_json_test.cc       |   4 +-
 9 files changed, 892 insertions(+), 172 deletions(-)

diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc
index ec34be1..5fd76c6 100644
--- a/src/iceberg/json_internal.cc
+++ b/src/iceberg/json_internal.cc
@@ -19,9 +19,11 @@
 
 #include "iceberg/json_internal.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <format>
 #include <regex>
+#include <type_traits>
 #include <unordered_set>
 
 #include <nlohmann/json.hpp>
@@ -32,6 +34,8 @@
 #include "iceberg/schema_internal.h"
 #include "iceberg/snapshot.h"
 #include "iceberg/sort_order.h"
+#include "iceberg/statistics_file.h"
+#include "iceberg/table_metadata.h"
 #include "iceberg/transform.h"
 #include "iceberg/type.h"
 #include "iceberg/util/formatter.h"  // IWYU pragma: keep
@@ -41,17 +45,21 @@ namespace iceberg {
 
 namespace {
 
+// Transform constants
 constexpr std::string_view kTransform = "transform";
 constexpr std::string_view kSourceId = "source-id";
 constexpr std::string_view kDirection = "direction";
 constexpr std::string_view kNullOrder = "null-order";
 
+// Sort order constants
 constexpr std::string_view kOrderId = "order-id";
 constexpr std::string_view kFields = "fields";
 
+// Schema constants
 constexpr std::string_view kSchemaId = "schema-id";
 constexpr std::string_view kIdentifierFieldIds = "identifier-field-ids";
 
+// Type constants
 constexpr std::string_view kType = "type";
 constexpr std::string_view kStruct = "struct";
 constexpr std::string_view kList = "list";
@@ -71,6 +79,7 @@ constexpr std::string_view kRequired = "required";
 constexpr std::string_view kElementRequired = "element-required";
 constexpr std::string_view kValueRequired = "value-required";
 
+// Snapshot constants
 constexpr std::string_view kFieldId = "field-id";
 constexpr std::string_view kSpecId = "spec-id";
 constexpr std::string_view kSnapshotId = "snapshot-id";
@@ -83,8 +92,6 @@ constexpr std::string_view kMinSnapshotsToKeep = 
"min-snapshots-to-keep";
 constexpr std::string_view kMaxSnapshotAgeMs = "max-snapshot-age-ms";
 constexpr std::string_view kMaxRefAgeMs = "max-ref-age-ms";
 
-constexpr int64_t kInitialSequenceNumber = 0;
-
 const std::unordered_set<std::string_view> kValidSnapshotSummaryFields = {
     SnapshotSummaryFields::kOperation,
     SnapshotSummaryFields::kAddedDataFiles,
@@ -123,21 +130,51 @@ const std::unordered_set<std::string_view> 
kValidDataOperation = {
     DataOperation::kAppend, DataOperation::kReplace, DataOperation::kOverwrite,
     DataOperation::kDelete};
 
+// TableMetadata constants
+constexpr std::string_view kFormatVersion = "format-version";
+constexpr std::string_view kTableUuid = "table-uuid";
+constexpr std::string_view kLocation = "location";
+constexpr std::string_view kLastSequenceNumber = "last-sequence-number";
+constexpr std::string_view kLastUpdatedMs = "last-updated-ms";
+constexpr std::string_view kLastColumnId = "last-column-id";
+constexpr std::string_view kSchema = "schema";
+constexpr std::string_view kSchemas = "schemas";
+constexpr std::string_view kCurrentSchemaId = "current-schema-id";
+constexpr std::string_view kPartitionSpec = "partition-spec";
+constexpr std::string_view kPartitionSpecs = "partition-specs";
+constexpr std::string_view kDefaultSpecId = "default-spec-id";
+constexpr std::string_view kLastPartitionId = "last-partition-id";
+constexpr std::string_view kProperties = "properties";
+constexpr std::string_view kCurrentSnapshotId = "current-snapshot-id";
+constexpr std::string_view kSnapshots = "snapshots";
+constexpr std::string_view kSnapshotLog = "snapshot-log";
+constexpr std::string_view kMetadataLog = "metadata-log";
+constexpr std::string_view kSortOrders = "sort-orders";
+constexpr std::string_view kDefaultSortOrderId = "default-sort-order-id";
+constexpr std::string_view kRefs = "refs";
+constexpr std::string_view kStatistics = "statistics";
+constexpr std::string_view kPartitionStatistics = "partition-statistics";
+constexpr std::string_view kNextRowId = "next-row-id";
+constexpr std::string_view kMetadataFile = "metadata-file";
+constexpr std::string_view kStatisticsPath = "statistics-path";
+constexpr std::string_view kFileSizeInBytes = "file-size-in-bytes";
+constexpr std::string_view kFileFooterSizeInBytes = 
"file-footer-size-in-bytes";
+constexpr std::string_view kBlobMetadata = "blob-metadata";
+
 template <typename T>
-Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) {
-  if (!json.contains(key)) {
-    return unexpected<Error>({
-        .kind = ErrorKind::kJsonParseError,
-        .message = std::format("Missing '{}' in {}", key, json.dump()),
-    });
+void SetOptionalField(nlohmann::json& json, std::string_view key,
+                      const std::optional<T>& value) {
+  if (value.has_value()) {
+    json[key] = *value;
   }
+}
+
+template <typename T>
+Result<T> GetJsonValueImpl(const nlohmann::json& json, std::string_view key) {
   try {
     return json.at(key).get<T>();
   } catch (const std::exception& ex) {
-    return unexpected<Error>({
-        .kind = ErrorKind::kJsonParseError,
-        .message = std::format("Failed to parse key '{}' in {}", key, 
json.dump()),
-    });
+    return JsonParseError("Failed to parse key '{}' in {}", key, json.dump());
   }
 }
 
@@ -147,22 +184,154 @@ Result<std::optional<T>> GetJsonValueOptional(const 
nlohmann::json& json,
   if (!json.contains(key)) {
     return std::nullopt;
   }
-  try {
-    return json.at(key).get<T>();
-  } catch (const std::exception& ex) {
-    return unexpected<Error>({
-        .kind = ErrorKind::kJsonParseError,
-        .message = std::format("Failed to parse key '{}' in {}", key, 
json.dump()),
-    });
+  return GetJsonValueImpl<T>(json, key);
+}
+
+template <typename T>
+Result<T> GetJsonValue(const nlohmann::json& json, std::string_view key) {
+  if (!json.contains(key)) {
+    return JsonParseError("Missing '{}' in {}", key, json.dump());
   }
+  return GetJsonValueImpl<T>(json, key);
 }
 
 template <typename T>
-void SetOptionalField(nlohmann::json& json, std::string_view key,
-                      const std::optional<T>& value) {
-  if (value.has_value()) {
-    json[key] = *value;
+Result<T> GetJsonValueOrDefault(const nlohmann::json& json, std::string_view 
key,
+                                T default_value = T{}) {
+  if (!json.contains(key)) {
+    return default_value;
   }
+  return GetJsonValueImpl<T>(json, key);
+}
+
+/// \brief Convert a list of items to a json array.
+///
+/// Note that ToJson(const T&) is required for this function to work.
+template <typename T>
+nlohmann::json::array_t ToJsonList(const std::vector<T>& list) {
+  return std::accumulate(list.cbegin(), list.cend(), nlohmann::json::array(),
+                         [](nlohmann::json::array_t arr, const T& item) {
+                           arr.push_back(ToJson(item));
+                           return arr;
+                         });
+}
+
+/// \brief Overload of the above function for a list of shared pointers.
+template <typename T>
+nlohmann::json::array_t ToJsonList(const std::vector<std::shared_ptr<T>>& 
list) {
+  return std::accumulate(list.cbegin(), list.cend(), nlohmann::json::array(),
+                         [](nlohmann::json::array_t arr, const 
std::shared_ptr<T>& item) {
+                           arr.push_back(ToJson(*item));
+                           return arr;
+                         });
+}
+
+/// \brief Parse a list of items from a JSON object.
+///
+/// \param[in] json The JSON object to parse.
+/// \param[in] key The key to parse.
+/// \param[in] from_json The function to parse an item from a JSON object.
+/// \return The list of items.
+template <typename T>
+Result<std::vector<T>> FromJsonList(
+    const nlohmann::json& json, std::string_view key,
+    const std::function<Result<T>(const nlohmann::json&)>& from_json) {
+  std::vector<T> list{};
+  if (json.contains(key)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto list_json, GetJsonValue<nlohmann::json>(json, 
key));
+    if (!list_json.is_array()) {
+      return JsonParseError("Cannot parse '{}' from non-array: {}", key,
+                            list_json.dump());
+    }
+    for (const auto& entry_json : list_json) {
+      ICEBERG_ASSIGN_OR_RAISE(auto entry, from_json(entry_json));
+      list.emplace_back(std::move(entry));
+    }
+  }
+  return {};
+}
+
+/// \brief Parse a list of items from a JSON object.
+///
+/// \param[in] json The JSON object to parse.
+/// \param[in] key The key to parse.
+/// \param[in] from_json The function to parse an item from a JSON object.
+/// \return The list of items.
+template <typename T>
+Result<std::vector<std::shared_ptr<T>>> FromJsonList(
+    const nlohmann::json& json, std::string_view key,
+    const std::function<Result<std::shared_ptr<T>>(const nlohmann::json&)>& 
from_json) {
+  std::vector<std::shared_ptr<T>> list{};
+  if (json.contains(key)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto list_json, GetJsonValue<nlohmann::json>(json, 
key));
+    if (!list_json.is_array()) {
+      return JsonParseError("Cannot parse '{}' from non-array: {}", key,
+                            list_json.dump());
+    }
+    for (const auto& entry_json : list_json) {
+      ICEBERG_ASSIGN_OR_RAISE(auto entry, from_json(entry_json));
+      list.emplace_back(std::move(entry));
+    }
+  }
+  return list;
+}
+
+/// \brief Convert a map of type <std::string, T> to a json object.
+///
+/// Note that ToJson(const T&) is required for this function to work.
+template <typename T>
+nlohmann::json::object_t ToJsonMap(const std::unordered_map<std::string, T>& 
map) {
+  return std::accumulate(map.cbegin(), map.cend(), nlohmann::json::object(),
+                         [](nlohmann::json::object_t obj, const auto& item) {
+                           obj[item.first] = ToJson(item.second);
+                           return obj;
+                         });
+}
+
+/// \brief Overload of the above function for a map of type <std::string,
+/// std::shared_ptr<T>>.
+template <typename T>
+nlohmann::json::object_t ToJsonMap(
+    const std::unordered_map<std::string, std::shared_ptr<T>>& map) {
+  return std::accumulate(map.cbegin(), map.cend(), nlohmann::json::object(),
+                         [](nlohmann::json::object_t obj, const auto& item) {
+                           obj[item.first] = ToJson(*item.second);
+                           return obj;
+                         });
+}
+
+/// \brief Parse a map of type <std::string, T> from a JSON object.
+///
+/// \param[in] json The JSON object to parse.
+/// \param[in] key The key to parse.
+/// \param[in] from_json The function to parse an item from a JSON object.
+/// \return The map of items.
+template <typename T = std::string>
+Result<std::unordered_map<std::string, T>> FromJsonMap(
+    const nlohmann::json& json, std::string_view key,
+    const std::function<Result<T>(const nlohmann::json&)>& from_json =
+        [](const nlohmann::json& json) -> Result<T> {
+      static_assert(std::is_same_v<T, std::string>, "T must be std::string");
+      try {
+        return json.get<std::string>();
+      } catch (const std::exception& ex) {
+        return JsonParseError("Cannot parse {} to a string value: {}", 
json.dump(),
+                              ex.what());
+      }
+    }) {
+  std::unordered_map<std::string, T> map{};
+  if (json.contains(key)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto map_json, GetJsonValue<nlohmann::json>(json, 
key));
+    if (!map_json.is_object()) {
+      return JsonParseError("Cannot parse '{}' from non-object: {}", key,
+                            map_json.dump());
+    }
+    for (const auto& [key, value] : map_json.items()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto entry, from_json(value));
+      map[key] = std::move(entry);
+    }
+  }
+  return map;
 }
 
 }  // namespace
@@ -215,16 +384,16 @@ Result<std::unique_ptr<SortOrder>> 
SortOrderFromJson(const nlohmann::json& json)
   return std::make_unique<SortOrder>(order_id, std::move(sort_fields));
 }
 
-nlohmann::json FieldToJson(const SchemaField& field) {
+nlohmann::json ToJson(const SchemaField& field) {
   nlohmann::json json;
   json[kId] = field.field_id();
   json[kName] = field.name();
   json[kRequired] = !field.optional();
-  json[kType] = TypeToJson(*field.type());
+  json[kType] = ToJson(*field.type());
   return json;
 }
 
-nlohmann::json TypeToJson(const Type& type) {
+nlohmann::json ToJson(const Type& type) {
   switch (type.type_id()) {
     case TypeId::kStruct: {
       const auto& struct_type = static_cast<const StructType&>(type);
@@ -232,7 +401,7 @@ nlohmann::json TypeToJson(const Type& type) {
       json[kType] = kStruct;
       nlohmann::json fields_json = nlohmann::json::array();
       for (const auto& field : struct_type.fields()) {
-        fields_json.push_back(FieldToJson(field));
+        fields_json.push_back(ToJson(field));
         // TODO(gangwu): add default values
       }
       json[kFields] = fields_json;
@@ -246,7 +415,7 @@ nlohmann::json TypeToJson(const Type& type) {
       const auto& element_field = list_type.fields().front();
       json[kElementId] = element_field.field_id();
       json[kElementRequired] = !element_field.optional();
-      json[kElement] = TypeToJson(*element_field.type());
+      json[kElement] = ToJson(*element_field.type());
       return json;
     }
     case TypeId::kMap: {
@@ -256,12 +425,12 @@ nlohmann::json TypeToJson(const Type& type) {
 
       const auto& key_field = map_type.key();
       json[kKeyId] = key_field.field_id();
-      json[kKey] = TypeToJson(*key_field.type());
+      json[kKey] = ToJson(*key_field.type());
 
       const auto& value_field = map_type.value();
       json[kValueId] = value_field.field_id();
       json[kValueRequired] = !value_field.optional();
-      json[kValue] = TypeToJson(*value_field.type());
+      json[kValue] = ToJson(*value_field.type());
       return json;
     }
     case TypeId::kBoolean:
@@ -300,8 +469,8 @@ nlohmann::json TypeToJson(const Type& type) {
   }
 }
 
-nlohmann::json SchemaToJson(const Schema& schema) {
-  nlohmann::json json = TypeToJson(static_cast<const Type&>(schema));
+nlohmann::json ToJson(const Schema& schema) {
+  nlohmann::json json = ToJson(static_cast<const Type&>(schema));
   json[kSchemaId] = schema.schema_id();
   // TODO(gangwu): add identifier-field-ids.
   return json;
@@ -327,7 +496,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
   nlohmann::json json;
   json[kSnapshotId] = snapshot.snapshot_id;
   SetOptionalField(json, kParentSnapshotId, snapshot.parent_snapshot_id);
-  if (snapshot.sequence_number > kInitialSequenceNumber) {
+  if (snapshot.sequence_number > TableMetadata::kInitialSequenceNumber) {
     json[kSequenceNumber] = snapshot.sequence_number;
   }
   json[kTimestampMs] = snapshot.timestamp_ms;
@@ -417,10 +586,7 @@ Result<std::unique_ptr<Type>> TypeFromJson(const 
nlohmann::json& json) {
       if (std::regex_match(type_str, match, fixed_regex)) {
         return std::make_unique<FixedType>(std::stoi(match[1].str()));
       }
-      return unexpected<Error>({
-          .kind = ErrorKind::kJsonParseError,
-          .message = std::format("Invalid fixed type: {}", type_str),
-      });
+      return JsonParseError("Invalid fixed type: {}", type_str);
     } else if (type_str.starts_with("decimal")) {
       std::regex decimal_regex(R"(decimal\(\s*(\d+)\s*,\s*(\d+)\s*\))");
       std::smatch match;
@@ -428,15 +594,9 @@ Result<std::unique_ptr<Type>> TypeFromJson(const 
nlohmann::json& json) {
         return std::make_unique<DecimalType>(std::stoi(match[1].str()),
                                              std::stoi(match[2].str()));
       }
-      return unexpected<Error>({
-          .kind = ErrorKind::kJsonParseError,
-          .message = std::format("Invalid decimal type: {}", type_str),
-      });
+      return JsonParseError("Invalid decimal type: {}", type_str);
     } else {
-      return unexpected<Error>({
-          .kind = ErrorKind::kJsonParseError,
-          .message = std::format("Unknown primitive type: {}", type_str),
-      });
+      return JsonParseError("Unknown primitive type: {}", type_str);
     }
   }
 
@@ -449,10 +609,7 @@ Result<std::unique_ptr<Type>> TypeFromJson(const 
nlohmann::json& json) {
   } else if (type_str == kMap) {
     return MapTypeFromJson(json);
   } else {
-    return unexpected<Error>({
-        .kind = ErrorKind::kJsonParseError,
-        .message = std::format("Unknown complex type: {}", type_str),
-    });
+    return JsonParseError("Unknown complex type: {}", type_str);
   }
 }
 
@@ -472,10 +629,7 @@ Result<std::unique_ptr<Schema>> SchemaFromJson(const 
nlohmann::json& json) {
   ICEBERG_ASSIGN_OR_RAISE(auto type, TypeFromJson(json));
 
   if (type->type_id() != TypeId::kStruct) [[unlikely]] {
-    return unexpected<Error>({
-        .kind = ErrorKind::kJsonParseError,
-        .message = std::format("Schema must be a struct type, but got {}", 
json.dump()),
-    });
+    return JsonParseError("Schema must be a struct type, but got {}", 
json.dump());
   }
 
   auto& struct_type = static_cast<StructType&>(*type);
@@ -602,8 +756,429 @@ Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const 
nlohmann::json& json) {
 
   return std::make_unique<Snapshot>(
       snapshot_id, parent_snapshot_id,
-      sequence_number.has_value() ? *sequence_number : kInitialSequenceNumber,
-      timestamp_ms, manifest_list, std::move(summary), schema_id);
+      sequence_number.value_or(TableMetadata::kInitialSequenceNumber), 
timestamp_ms,
+      manifest_list, std::move(summary), schema_id);
+}
+
+nlohmann::json ToJson(const BlobMetadata& blob_metadata) {
+  nlohmann::json json;
+  json[kType] = blob_metadata.type;
+  json[kSnapshotId] = blob_metadata.source_snapshot_id;
+  json[kSequenceNumber] = blob_metadata.source_snapshot_sequence_number;
+  json[kFields] = blob_metadata.fields;
+  if (!blob_metadata.properties.empty()) {
+    json[kProperties] = blob_metadata.properties;
+  }
+  return json;
+}
+
+Result<BlobMetadata> BlobMetadataFromJson(const nlohmann::json& json) {
+  BlobMetadata blob_metadata;
+  ICEBERG_ASSIGN_OR_RAISE(blob_metadata.type, GetJsonValue<std::string>(json, 
kType));
+  ICEBERG_ASSIGN_OR_RAISE(blob_metadata.source_snapshot_id,
+                          GetJsonValue<int64_t>(json, kSnapshotId));
+  ICEBERG_ASSIGN_OR_RAISE(blob_metadata.source_snapshot_sequence_number,
+                          GetJsonValue<int64_t>(json, kSequenceNumber));
+  ICEBERG_ASSIGN_OR_RAISE(blob_metadata.fields,
+                          GetJsonValue<std::vector<int32_t>>(json, kFields));
+  ICEBERG_ASSIGN_OR_RAISE(
+      blob_metadata.properties,
+      (GetJsonValueOrDefault<std::unordered_map<std::string, 
std::string>>(json,
+                                                                           
kProperties)));
+  return blob_metadata;
+}
+
+nlohmann::json ToJson(const StatisticsFile& statistics_file) {
+  nlohmann::json json;
+  json[kSnapshotId] = statistics_file.snapshot_id;
+  json[kStatisticsPath] = statistics_file.path;
+  json[kFileSizeInBytes] = statistics_file.file_size_in_bytes;
+  json[kFileFooterSizeInBytes] = statistics_file.file_footer_size_in_bytes;
+
+  nlohmann::json blob_metadata_array = nlohmann::json::array();
+  for (const auto& blob_metadata : statistics_file.blob_metadata) {
+    blob_metadata_array.push_back(ToJson(blob_metadata));
+  }
+  json[kBlobMetadata] = blob_metadata_array;
+
+  return json;
+}
+
+Result<std::unique_ptr<StatisticsFile>> StatisticsFileFromJson(
+    const nlohmann::json& json) {
+  auto stats_file = std::make_unique<StatisticsFile>();
+  ICEBERG_ASSIGN_OR_RAISE(stats_file->snapshot_id,
+                          GetJsonValue<int64_t>(json, kSnapshotId));
+  ICEBERG_ASSIGN_OR_RAISE(stats_file->path,
+                          GetJsonValue<std::string>(json, kStatisticsPath));
+  ICEBERG_ASSIGN_OR_RAISE(stats_file->file_size_in_bytes,
+                          GetJsonValue<int64_t>(json, kFileSizeInBytes));
+  ICEBERG_ASSIGN_OR_RAISE(stats_file->file_footer_size_in_bytes,
+                          GetJsonValue<int64_t>(json, kFileFooterSizeInBytes));
+
+  ICEBERG_ASSIGN_OR_RAISE(auto blob_metadata_array,
+                          GetJsonValue<nlohmann::json>(json, kBlobMetadata));
+  for (const auto& blob_json : blob_metadata_array) {
+    ICEBERG_ASSIGN_OR_RAISE(auto blob, BlobMetadataFromJson(blob_json));
+    stats_file->blob_metadata.push_back(std::move(blob));
+  }
+
+  return stats_file;
+}
+
+nlohmann::json ToJson(const PartitionStatisticsFile& 
partition_statistics_file) {
+  nlohmann::json json;
+  json[kSnapshotId] = partition_statistics_file.snapshot_id;
+  json[kStatisticsPath] = partition_statistics_file.path;
+  json[kFileSizeInBytes] = partition_statistics_file.file_size_in_bytes;
+  return json;
+}
+
+Result<std::unique_ptr<PartitionStatisticsFile>> 
PartitionStatisticsFileFromJson(
+    const nlohmann::json& json) {
+  auto stats_file = std::make_unique<PartitionStatisticsFile>();
+  ICEBERG_ASSIGN_OR_RAISE(stats_file->snapshot_id,
+                          GetJsonValue<int64_t>(json, kSnapshotId));
+  ICEBERG_ASSIGN_OR_RAISE(stats_file->path,
+                          GetJsonValue<std::string>(json, kStatisticsPath));
+  ICEBERG_ASSIGN_OR_RAISE(stats_file->file_size_in_bytes,
+                          GetJsonValue<int64_t>(json, kFileSizeInBytes));
+  return stats_file;
+}
+
+nlohmann::json ToJson(const SnapshotLogEntry& snapshot_log_entry) {
+  nlohmann::json json;
+  json[kTimestampMs] = UnixMsFromTimePointMs(snapshot_log_entry.timestamp_ms);
+  json[kSnapshotId] = snapshot_log_entry.snapshot_id;
+  return json;
+}
+
+Result<SnapshotLogEntry> SnapshotLogEntryFromJson(const nlohmann::json& json) {
+  SnapshotLogEntry snapshot_log_entry;
+  ICEBERG_ASSIGN_OR_RAISE(
+      snapshot_log_entry.timestamp_ms,
+      GetJsonValue<int64_t>(json, 
kTimestampMs).and_then(TimePointMsFromUnixMs));
+  ICEBERG_ASSIGN_OR_RAISE(snapshot_log_entry.snapshot_id,
+                          GetJsonValue<int64_t>(json, kSnapshotId));
+  return snapshot_log_entry;
+}
+
+nlohmann::json ToJson(const MetadataLogEntry& metadata_log_entry) {
+  nlohmann::json json;
+  json[kTimestampMs] = UnixMsFromTimePointMs(metadata_log_entry.timestamp_ms);
+  json[kMetadataFile] = metadata_log_entry.metadata_file;
+  return json;
+}
+
+Result<MetadataLogEntry> MetadataLogEntryFromJson(const nlohmann::json& json) {
+  MetadataLogEntry metadata_log_entry;
+  ICEBERG_ASSIGN_OR_RAISE(
+      metadata_log_entry.timestamp_ms,
+      GetJsonValue<int64_t>(json, 
kTimestampMs).and_then(TimePointMsFromUnixMs));
+  ICEBERG_ASSIGN_OR_RAISE(metadata_log_entry.metadata_file,
+                          GetJsonValue<std::string>(json, kMetadataFile));
+  return metadata_log_entry;
+}
+
+nlohmann::json ToJson(const TableMetadata& table_metadata) {
+  nlohmann::json json;
+
+  json[kFormatVersion] = table_metadata.format_version;
+  json[kTableUuid] = table_metadata.table_uuid;
+  json[kLocation] = table_metadata.location;
+  if (table_metadata.format_version > 1) {
+    json[kLastSequenceNumber] = table_metadata.last_sequence_number;
+  }
+  json[kLastUpdatedMs] = UnixMsFromTimePointMs(table_metadata.last_updated_ms);
+  json[kLastColumnId] = table_metadata.last_column_id;
+
+  // for older readers, continue writing the current schema as "schema".
+  // this is only needed for v1 because support for schemas and 
current-schema-id
+  // is required in v2 and later.
+  if (table_metadata.format_version == 1) {
+    for (const auto& schema : table_metadata.schemas) {
+      if (schema->schema_id() == table_metadata.current_schema_id) {
+        json[kSchema] = ToJson(*schema);
+        break;
+      }
+    }
+  }
+
+  // write the current schema ID and schema list
+  json[kCurrentSchemaId] = table_metadata.current_schema_id;
+  json[kSchemas] = ToJsonList(table_metadata.schemas);
+
+  // for older readers, continue writing the default spec as "partition-spec"
+  if (table_metadata.format_version == 1) {
+    for (const auto& partition_spec : table_metadata.partition_specs) {
+      if (partition_spec->spec_id() == table_metadata.default_spec_id) {
+        json[kPartitionSpec] = ToJson(*partition_spec);
+        break;
+      }
+    }
+  }
+
+  // write the default spec ID and spec list
+  json[kDefaultSpecId] = table_metadata.default_spec_id;
+  json[kPartitionSpecs] = ToJsonList(table_metadata.partition_specs);
+  json[kLastPartitionId] = table_metadata.last_partition_id;
+
+  // write the default order ID and sort order list
+  json[kDefaultSortOrderId] = table_metadata.default_sort_order_id;
+  json[kSortOrders] = ToJsonList(table_metadata.sort_orders);
+
+  // write properties map
+  json[kProperties] = table_metadata.properties;
+
+  if (std::ranges::find_if(table_metadata.snapshots, [&](const auto& snapshot) 
{
+        return snapshot->snapshot_id == table_metadata.current_snapshot_id;
+      }) != table_metadata.snapshots.cend()) {
+    json[kCurrentSnapshotId] = table_metadata.current_snapshot_id;
+  } else {
+    json[kCurrentSnapshotId] = nlohmann::json::value_t::null;
+  }
+
+  if (table_metadata.format_version >= 3) {
+    json[kNextRowId] = table_metadata.next_row_id;
+  }
+
+  json[kRefs] = ToJsonMap(table_metadata.refs);
+  json[kSnapshots] = ToJsonList(table_metadata.snapshots);
+  json[kStatistics] = ToJsonList(table_metadata.statistics);
+  json[kPartitionStatistics] = ToJsonList(table_metadata.partition_statistics);
+  json[kSnapshotLog] = ToJsonList(table_metadata.snapshot_log);
+  json[kMetadataLog] = ToJsonList(table_metadata.metadata_log);
+
+  return json;
+}
+
+namespace {
+
+/// \brief Parse the schemas from the JSON object.
+///
+/// \param[in] json The JSON object to parse.
+/// \param[in] format_version The format version of the table.
+/// \param[out] current_schema_id The current schema ID.
+/// \param[out] schemas The list of schemas.
+///
+/// \return The current schema or parse error.
+Result<std::shared_ptr<Schema>> ParseSchemas(
+    const nlohmann::json& json, int8_t format_version, int32_t& 
current_schema_id,
+    std::vector<std::shared_ptr<Schema>>& schemas) {
+  std::shared_ptr<Schema> current_schema;
+  if (json.contains(kSchemas)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema_array,
+                            GetJsonValue<nlohmann::json>(json, kSchemas));
+    if (!schema_array.is_array()) {
+      return JsonParseError("Cannot parse schemas from non-array: {}",
+                            schema_array.dump());
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(current_schema_id,
+                            GetJsonValue<int32_t>(json, kCurrentSchemaId));
+    for (const auto& schema_json : schema_array) {
+      ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<Schema> schema,
+                              SchemaFromJson(schema_json));
+      if (schema->schema_id() == current_schema_id) {
+        current_schema = schema;
+      }
+      schemas.push_back(std::move(schema));
+    }
+    if (!current_schema) {
+      return JsonParseError("Cannot find schema with {}={} from {}", 
kCurrentSchemaId,
+                            current_schema_id, schema_array.dump());
+    }
+  } else {
+    if (format_version != 1) {
+      return JsonParseError("{} must exist in format v{}", kSchemas, 
format_version);
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto schema_json,
+                            GetJsonValue<nlohmann::json>(json, kSchema));
+    ICEBERG_ASSIGN_OR_RAISE(current_schema, SchemaFromJson(schema_json));
+    current_schema_id = current_schema->schema_id();
+    schemas.push_back(current_schema);
+  }
+  return current_schema;
+}
+
+/// \brief Parse the partition specs from the JSON object.
+///
+/// \param[in] json The JSON object to parse.
+/// \param[in] format_version The format version of the table.
+/// \param[in] current_schema The current schema.
+/// \param[out] default_spec_id The default partition spec ID.
+/// \param[out] partition_specs The list of partition specs.
+Status ParsePartitionSpecs(const nlohmann::json& json, int8_t format_version,
+                           const std::shared_ptr<Schema>& current_schema,
+                           int32_t& default_spec_id,
+                           std::vector<std::shared_ptr<PartitionSpec>>& 
partition_specs) {
+  if (json.contains(kPartitionSpecs)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec_array,
+                            GetJsonValue<nlohmann::json>(json, 
kPartitionSpecs));
+    if (!spec_array.is_array()) {
+      return JsonParseError("Cannot parse partition specs from non-array: {}",
+                            spec_array.dump());
+    }
+    ICEBERG_ASSIGN_OR_RAISE(default_spec_id, GetJsonValue<int32_t>(json, 
kDefaultSpecId));
+
+    for (const auto& spec_json : spec_array) {
+      ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                              PartitionSpecFromJson(current_schema, 
spec_json));
+      partition_specs.push_back(std::move(spec));
+    }
+  } else {
+    if (format_version != 1) {
+      return JsonParseError("{} must exist in format v{}", kPartitionSpecs,
+                            format_version);
+    }
+    default_spec_id = TableMetadata::kInitialSpecId;
+
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, GetJsonValue<nlohmann::json>(json, 
kPartitionSpec)
+                                           .and_then([current_schema](const 
auto& json) {
+                                             return 
PartitionSpecFromJson(current_schema,
+                                                                          
json);
+                                           }));
+    partition_specs.push_back(std::move(spec));
+  }
+
+  return {};
+}
+
+/// \brief Parse the sort orders from the JSON object.
+///
+/// \param[in] json The JSON object to parse.
+/// \param[in] format_version The format version of the table.
+/// \param[out] default_sort_order_id The default sort order ID.
+/// \param[out] sort_orders The list of sort orders.
+Status ParseSortOrders(const nlohmann::json& json, int8_t format_version,
+                       int32_t& default_sort_order_id,
+                       std::vector<std::shared_ptr<SortOrder>>& sort_orders) {
+  if (json.contains(kSortOrders)) {
+    ICEBERG_ASSIGN_OR_RAISE(default_sort_order_id,
+                            GetJsonValue<int32_t>(json, kDefaultSortOrderId));
+    ICEBERG_ASSIGN_OR_RAISE(auto sort_order_array,
+                            GetJsonValue<nlohmann::json>(json, kSortOrders));
+    for (const auto& sort_order_json : sort_order_array) {
+      ICEBERG_ASSIGN_OR_RAISE(auto sort_order, 
SortOrderFromJson(sort_order_json));
+      sort_orders.push_back(std::move(sort_order));
+    }
+  } else {
+    if (format_version > 1) {
+      return JsonParseError("{} must exist in format v{}", kSortOrders, 
format_version);
+    }
+    return NotImplementedError("Assign a default sort order");
+  }
+  return {};
+}
+
+}  // namespace
+
+Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const 
nlohmann::json& json) {
+  if (!json.is_object()) {
+    return JsonParseError("Cannot parse metadata from a non-object: {}", 
json.dump());
+  }
+
+  auto table_metadata = std::make_unique<TableMetadata>();
+
+  ICEBERG_ASSIGN_OR_RAISE(table_metadata->format_version,
+                          GetJsonValue<int8_t>(json, kFormatVersion));
+  if (table_metadata->format_version < 1 ||
+      table_metadata->format_version > 
TableMetadata::kSupportedTableFormatVersion) {
+    return JsonParseError("Cannot read unsupported version: {}",
+                          table_metadata->format_version);
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(table_metadata->table_uuid,
+                          GetJsonValueOrDefault<std::string>(json, 
kTableUuid));
+  ICEBERG_ASSIGN_OR_RAISE(table_metadata->location,
+                          GetJsonValue<std::string>(json, kLocation));
+
+  if (table_metadata->format_version > 1) {
+    ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_sequence_number,
+                            GetJsonValue<int64_t>(json, kLastSequenceNumber));
+  } else {
+    table_metadata->last_sequence_number = 
TableMetadata::kInitialSequenceNumber;
+  }
+  ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_column_id,
+                          GetJsonValue<int32_t>(json, kLastColumnId));
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto current_schema,
+      ParseSchemas(json, table_metadata->format_version,
+                   table_metadata->current_schema_id, 
table_metadata->schemas));
+
+  ICEBERG_RETURN_UNEXPECTED(ParsePartitionSpecs(
+      json, table_metadata->format_version, current_schema,
+      table_metadata->default_spec_id, table_metadata->partition_specs));
+
+  if (json.contains(kLastPartitionId)) {
+    ICEBERG_ASSIGN_OR_RAISE(table_metadata->last_partition_id,
+                            GetJsonValue<int32_t>(json, kLastPartitionId));
+  } else {
+    if (table_metadata->format_version > 1) {
+      return JsonParseError("{} must exist in format v{}", kLastPartitionId,
+                            table_metadata->format_version);
+    }
+    // TODO(gangwu): iterate all partition specs to find the largest partition
+    // field id or assign a default value for unpartitioned tables. However,
+    // PartitionSpec::lastAssignedFieldId() is not implemented yet.
+    return NotImplementedError("Find the largest partition field id");
+  }
+
+  ICEBERG_RETURN_UNEXPECTED(ParseSortOrders(json, 
table_metadata->format_version,
+                                            
table_metadata->default_sort_order_id,
+                                            table_metadata->sort_orders));
+
+  if (json.contains(kProperties)) {
+    ICEBERG_ASSIGN_OR_RAISE(table_metadata->properties, FromJsonMap(json, 
kProperties));
+  }
+
+  // This field is optional, but internally we set this to -1 when not set
+  ICEBERG_ASSIGN_OR_RAISE(
+      table_metadata->current_snapshot_id,
+      GetJsonValueOrDefault<int64_t>(json, kCurrentSnapshotId,
+                                     TableMetadata::kInvalidSnapshotId));
+
+  if (table_metadata->format_version >= 3) {
+    ICEBERG_ASSIGN_OR_RAISE(table_metadata->next_row_id,
+                            GetJsonValue<int64_t>(json, kNextRowId));
+  } else {
+    table_metadata->next_row_id = TableMetadata::kInitialRowId;
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto last_updated_ms,
+                          GetJsonValue<int64_t>(json, kLastUpdatedMs));
+  table_metadata->last_updated_ms =
+      TimePointMs{std::chrono::milliseconds(last_updated_ms)};
+
+  if (json.contains(kRefs)) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        table_metadata->refs,
+        FromJsonMap<std::shared_ptr<SnapshotRef>>(json, kRefs, 
SnapshotRefFromJson));
+  } else if (table_metadata->current_snapshot_id != 
TableMetadata::kInvalidSnapshotId) {
+    table_metadata->refs["main"] = std::make_unique<SnapshotRef>(SnapshotRef{
+        .snapshot_id = table_metadata->current_snapshot_id,
+        .retention = SnapshotRef::Branch{},
+    });
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(table_metadata->snapshots,
+                          FromJsonList<Snapshot>(json, kSnapshots, 
SnapshotFromJson));
+  ICEBERG_ASSIGN_OR_RAISE(
+      table_metadata->statistics,
+      FromJsonList<StatisticsFile>(json, kStatistics, StatisticsFileFromJson));
+  ICEBERG_ASSIGN_OR_RAISE(
+      table_metadata->partition_statistics,
+      FromJsonList<PartitionStatisticsFile>(json, kPartitionStatistics,
+                                            PartitionStatisticsFileFromJson));
+  ICEBERG_ASSIGN_OR_RAISE(
+      table_metadata->snapshot_log,
+      FromJsonList<SnapshotLogEntry>(json, kSnapshotLog, 
SnapshotLogEntryFromJson));
+  ICEBERG_ASSIGN_OR_RAISE(
+      table_metadata->metadata_log,
+      FromJsonList<MetadataLogEntry>(json, kMetadataLog, 
MetadataLogEntryFromJson));
+
+  return table_metadata;
 }
 
 }  // namespace iceberg
diff --git a/src/iceberg/json_internal.h b/src/iceberg/json_internal.h
index 450578f..8881d5e 100644
--- a/src/iceberg/json_internal.h
+++ b/src/iceberg/json_internal.h
@@ -24,6 +24,8 @@
 #include <nlohmann/json_fwd.hpp>
 
 #include "iceberg/result.h"
+#include "iceberg/statistics_file.h"
+#include "iceberg/table_metadata.h"
 #include "iceberg/type_fwd.h"
 
 namespace iceberg {
@@ -38,16 +40,6 @@ namespace iceberg {
 /// \return A JSON object representing the `SortField` in the form of 
key-value pairs.
 nlohmann::json ToJson(const SortField& sort_field);
 
-/// \brief Serializes a `SortOrder` object to JSON.
-///
-/// This function converts a `SortOrder` object into a JSON representation.
-/// The resulting JSON includes the order ID and a list of `SortField` objects.
-/// Each `SortField` is serialized as described in the `ToJson(SortField)` 
function.
-///
-/// \param sort_order The `SortOrder` object to be serialized.
-/// \return A JSON object representing the `SortOrder` with its order ID and 
fields array.
-nlohmann::json ToJson(const SortOrder& sort_order);
-
 /// \brief Deserializes a JSON object into a `SortField` object.
 ///
 /// This function parses the provided JSON and creates a `SortField` object.
@@ -59,6 +51,16 @@ nlohmann::json ToJson(const SortOrder& sort_order);
 /// JSON is malformed or missing expected fields, an error will be returned.
 Result<std::unique_ptr<SortField>> SortFieldFromJson(const nlohmann::json& 
json);
 
+/// \brief Serializes a `SortOrder` object to JSON.
+///
+/// This function converts a `SortOrder` object into a JSON representation.
+/// The resulting JSON includes the order ID and a list of `SortField` objects.
+/// Each `SortField` is serialized as described in the `ToJson(SortField)` 
function.
+///
+/// \param sort_order The `SortOrder` object to be serialized.
+/// \return A JSON object representing the `SortOrder` with its order ID and 
fields array.
+nlohmann::json ToJson(const SortOrder& sort_order);
+
 /// \brief Deserializes a JSON object into a `SortOrder` object.
 ///
 /// This function parses the provided JSON and creates a `SortOrder` object.
@@ -74,31 +76,7 @@ Result<std::unique_ptr<SortOrder>> SortOrderFromJson(const 
nlohmann::json& json)
 ///
 /// \param[in] schema The Iceberg schema to convert.
 /// \return The JSON representation of the schema.
-nlohmann::json SchemaToJson(const Schema& schema);
-
-/// \brief Convert an Iceberg Type to JSON.
-///
-/// \param[in] type The Iceberg type to convert.
-/// \return The JSON representation of the type.
-nlohmann::json TypeToJson(const Type& type);
-
-/// \brief Convert an Iceberg SchemaField to JSON.
-///
-/// \param[in] field The Iceberg field to convert.
-/// \return The JSON representation of the field.
-nlohmann::json FieldToJson(const SchemaField& field);
-
-/// \brief Serializes a `SnapshotRef` object to JSON.
-///
-/// \param[in] snapshot_ref The `SnapshotRef` object to be serialized.
-/// \return A JSON object representing the `SnapshotRef`.
-nlohmann::json ToJson(const SnapshotRef& snapshot_ref);
-
-/// \brief Serializes a `Snapshot` object to JSON.
-///
-/// \param[in] snapshot The `Snapshot` object to be serialized.
-/// \return A JSON object representing the `snapshot`.
-nlohmann::json ToJson(const Snapshot& snapshot);
+nlohmann::json ToJson(const Schema& schema);
 
 /// \brief Convert JSON to an Iceberg Schema.
 ///
@@ -106,12 +84,24 @@ nlohmann::json ToJson(const Snapshot& snapshot);
 /// \return The Iceberg schema or an error if the conversion fails.
 Result<std::unique_ptr<Schema>> SchemaFromJson(const nlohmann::json& json);
 
+/// \brief Convert an Iceberg Type to JSON.
+///
+/// \param[in] type The Iceberg type to convert.
+/// \return The JSON representation of the type.
+nlohmann::json ToJson(const Type& type);
+
 /// \brief Convert JSON to an Iceberg Type.
 ///
 /// \param[in] json The JSON representation of the type.
 /// \return The Iceberg type or an error if the conversion fails.
 Result<std::unique_ptr<Type>> TypeFromJson(const nlohmann::json& json);
 
+/// \brief Convert an Iceberg SchemaField to JSON.
+///
+/// \param[in] field The Iceberg field to convert.
+/// \return The JSON representation of the field.
+nlohmann::json ToJson(const SchemaField& field);
+
 /// \brief Convert JSON to an Iceberg SchemaField.
 ///
 /// \param[in] json The JSON representation of the field.
@@ -129,18 +119,6 @@ Result<std::unique_ptr<SchemaField>> FieldFromJson(const 
nlohmann::json& json);
 /// pairs.
 nlohmann::json ToJson(const PartitionField& partition_field);
 
-/// \brief Serializes a `PartitionSpec` object to JSON.
-///
-/// This function converts a `PartitionSpec` object into a JSON representation.
-/// The resulting JSON includes the spec ID and a list of `PartitionField` 
objects.
-/// Each `PartitionField` is serialized as described in the 
`ToJson(PartitionField)`
-/// function.
-///
-/// \param partition_spec The `PartitionSpec` object to be serialized.
-/// \return A JSON object representing the `PartitionSpec` with its order ID 
and fields
-/// array.
-nlohmann::json ToJson(const PartitionSpec& partition_spec);
-
 /// \brief Deserializes a JSON object into a `PartitionField` object.
 ///
 /// This function parses the provided JSON and creates a `PartitionField` 
object.
@@ -153,6 +131,18 @@ nlohmann::json ToJson(const PartitionSpec& partition_spec);
 Result<std::unique_ptr<PartitionField>> PartitionFieldFromJson(
     const nlohmann::json& json);
 
+/// \brief Serializes a `PartitionSpec` object to JSON.
+///
+/// This function converts a `PartitionSpec` object into a JSON representation.
+/// The resulting JSON includes the spec ID and a list of `PartitionField` 
objects.
+/// Each `PartitionField` is serialized as described in the 
`ToJson(PartitionField)`
+/// function.
+///
+/// \param partition_spec The `PartitionSpec` object to be serialized.
+/// \return A JSON object representing the `PartitionSpec` with its order ID 
and fields
+/// array.
+nlohmann::json ToJson(const PartitionSpec& partition_spec);
+
 /// \brief Deserializes a JSON object into a `PartitionSpec` object.
 ///
 /// This function parses the provided JSON and creates a `PartitionSpec` 
object.
@@ -166,16 +156,90 @@ Result<std::unique_ptr<PartitionField>> 
PartitionFieldFromJson(
 Result<std::unique_ptr<PartitionSpec>> PartitionSpecFromJson(
     const std::shared_ptr<Schema>& schema, const nlohmann::json& json);
 
+/// \brief Serializes a `SnapshotRef` object to JSON.
+///
+/// \param[in] snapshot_ref The `SnapshotRef` object to be serialized.
+/// \return A JSON object representing the `SnapshotRef`.
+nlohmann::json ToJson(const SnapshotRef& snapshot_ref);
+
 /// \brief Deserializes a JSON object into a `SnapshotRef` object.
 ///
 /// \param[in] json The JSON object representing a `SnapshotRef`.
 /// \return A `SnapshotRef` object or an error if the conversion fails.
 Result<std::unique_ptr<SnapshotRef>> SnapshotRefFromJson(const nlohmann::json& 
json);
 
+/// \brief Serializes a `Snapshot` object to JSON.
+///
+/// \param[in] snapshot The `Snapshot` object to be serialized.
+/// \return A JSON object representing the `snapshot`.
+nlohmann::json ToJson(const Snapshot& snapshot);
+
 /// \brief Deserializes a JSON object into a `Snapshot` object.
 ///
 /// \param[in] json The JSON representation of the snapshot.
 /// \return A `Snapshot` object or an error if the conversion fails.
 Result<std::unique_ptr<Snapshot>> SnapshotFromJson(const nlohmann::json& json);
 
+/// \brief Serializes a `StatisticsFile` object to JSON.
+///
+/// \param statistics_file The `StatisticsFile` object to be serialized.
+/// \return A JSON object representing the `StatisticsFile`.
+nlohmann::json ToJson(const StatisticsFile& statistics_file);
+
+/// \brief Deserializes a JSON object into a `StatisticsFile` object.
+///
+/// \param json The JSON object representing a `StatisticsFile`.
+/// \return A `StatisticsFile` object or an error if the conversion fails.
+Result<std::unique_ptr<StatisticsFile>> StatisticsFileFromJson(
+    const nlohmann::json& json);
+
+/// \brief Serializes a `PartitionStatisticsFile` object to JSON.
+///
+/// \param partition_statistics_file The `PartitionStatisticsFile` object to be
+/// serialized. \return A JSON object representing the 
`PartitionStatisticsFile`.
+nlohmann::json ToJson(const PartitionStatisticsFile& 
partition_statistics_file);
+
+/// \brief Deserializes a JSON object into a `PartitionStatisticsFile` object.
+///
+/// \param json The JSON object representing a `PartitionStatisticsFile`.
+/// \return A `PartitionStatisticsFile` object or an error if the conversion 
fails.
+Result<std::unique_ptr<PartitionStatisticsFile>> 
PartitionStatisticsFileFromJson(
+    const nlohmann::json& json);
+
+/// \brief Serializes a `SnapshotLogEntry` object to JSON.
+///
+/// \param snapshot_log_entry The `SnapshotLogEntry` object to be serialized.
+/// \return A JSON object representing the `SnapshotLogEntry`.
+nlohmann::json ToJson(const SnapshotLogEntry& snapshot_log_entry);
+
+/// \brief Deserializes a JSON object into a `SnapshotLogEntry` object.
+///
+/// \param json The JSON object representing a `SnapshotLogEntry`.
+/// \return A `SnapshotLogEntry` object or an error if the conversion fails.
+Result<SnapshotLogEntry> SnapshotLogEntryFromJson(const nlohmann::json& json);
+
+/// \brief Serializes a `MetadataLogEntry` object to JSON.
+///
+/// \param metadata_log_entry The `MetadataLogEntry` object to be serialized.
+/// \return A JSON object representing the `MetadataLogEntry`.
+nlohmann::json ToJson(const MetadataLogEntry& metadata_log_entry);
+
+/// \brief Deserializes a JSON object into a `MetadataLogEntry` object.
+///
+/// \param json The JSON object representing a `MetadataLogEntry`.
+/// \return A `MetadataLogEntry` object or an error if the conversion fails.
+Result<MetadataLogEntry> MetadataLogEntryFromJson(const nlohmann::json& json);
+
+/// \brief Serializes a `TableMetadata` object to JSON.
+///
+/// \param table_metadata The `TableMetadata` object to be serialized.
+/// \return A JSON object representing the `TableMetadata`.
+nlohmann::json ToJson(const TableMetadata& table_metadata);
+
+/// \brief Deserializes a JSON object into a `TableMetadata` object.
+///
+/// \param json The JSON object representing a `TableMetadata`.
+/// \return A `TableMetadata` object or an error if the conversion fails.
+Result<std::unique_ptr<TableMetadata>> TableMetadataFromJson(const 
nlohmann::json& json);
+
 }  // namespace iceberg
diff --git a/src/iceberg/result.h b/src/iceberg/result.h
index a70383d..06c7874 100644
--- a/src/iceberg/result.h
+++ b/src/iceberg/result.h
@@ -19,6 +19,7 @@
 
 #pragma once
 
+#include <format>
 #include <string>
 
 #include "iceberg/expected.h"
@@ -61,4 +62,20 @@ using Result = expected<T, E>;
 
 using Status = Result<void>;
 
+/// \brief Create an unexpected error with kNotImplemented
+template <typename... Args>
+auto NotImplementedError(const std::format_string<Args...> fmt, Args&&... args)
+    -> unexpected<Error> {
+  return unexpected<Error>({.kind = ErrorKind::kNotImplemented,
+                            .message = std::format(fmt, 
std::forward<Args>(args)...)});
+}
+
+/// \brief Create an unexpected error with kJsonParseError
+template <typename... Args>
+auto JsonParseError(const std::format_string<Args...> fmt, Args&&... args)
+    -> unexpected<Error> {
+  return unexpected<Error>({.kind = ErrorKind::kJsonParseError,
+                            .message = std::format(fmt, 
std::forward<Args>(args)...)});
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/statistics_file.cc b/src/iceberg/statistics_file.cc
index 07f4f53..5ae753a 100644
--- a/src/iceberg/statistics_file.cc
+++ b/src/iceberg/statistics_file.cc
@@ -23,29 +23,26 @@
 
 namespace iceberg {
 
-bool BlobMetadata::Equals(const BlobMetadata& other) const {
-  return type == other.type && source_snapshot_id == other.source_snapshot_id 
&&
-         source_snapshot_sequence_number == 
other.source_snapshot_sequence_number &&
-         fields == other.fields && properties == other.properties;
-}
-
-std::string BlobMetadata::ToString() const {
+std::string ToString(const BlobMetadata& blob_metadata) {
   std::string repr = "BlobMetadata[";
   std::format_to(std::back_inserter(repr),
-                 
"type='{}',sourceSnapshotId={},sourceSnapshotSequenceNumber={},", type,
-                 source_snapshot_id, source_snapshot_sequence_number);
+                 
"type='{}',sourceSnapshotId={},sourceSnapshotSequenceNumber={},",
+                 blob_metadata.type, blob_metadata.source_snapshot_id,
+                 blob_metadata.source_snapshot_sequence_number);
   std::format_to(std::back_inserter(repr), "fields=[");
-  for (auto iter = fields.cbegin(); iter != fields.cend(); ++iter) {
-    if (iter != fields.cbegin()) {
+  for (auto iter = blob_metadata.fields.cbegin(); iter != 
blob_metadata.fields.cend();
+       ++iter) {
+    if (iter != blob_metadata.fields.cbegin()) {
       std::format_to(std::back_inserter(repr), ",{}", *iter);
     } else {
       std::format_to(std::back_inserter(repr), "{}", *iter);
     }
   }
   std::format_to(std::back_inserter(repr), "],properties=[");
-  for (auto iter = properties.cbegin(); iter != properties.cend(); ++iter) {
+  for (auto iter = blob_metadata.properties.cbegin();
+       iter != blob_metadata.properties.cend(); ++iter) {
     const auto& [key, value] = *iter;
-    if (iter != properties.cbegin()) {
+    if (iter != blob_metadata.properties.cbegin()) {
       std::format_to(std::back_inserter(repr), ",{}:{}", key, value);
     } else {
       std::format_to(std::back_inserter(repr), "{}:{}", key, value);
@@ -55,28 +52,32 @@ std::string BlobMetadata::ToString() const {
   return repr;
 }
 
-bool StatisticsFile::Equals(const StatisticsFile& other) const {
-  return snapshot_id == other.snapshot_id && path == other.path &&
-         file_size_in_bytes == other.file_size_in_bytes &&
-         file_footer_size_in_bytes == other.file_footer_size_in_bytes &&
-         blob_metadata == other.blob_metadata;
-}
-
-std::string StatisticsFile::ToString() const {
+std::string ToString(const StatisticsFile& statistics_file) {
   std::string repr = "StatisticsFile[";
   std::format_to(std::back_inserter(repr),
                  
"snapshotId={},path={},fileSizeInBytes={},fileFooterSizeInBytes={},",
-                 snapshot_id, path, file_size_in_bytes, 
file_footer_size_in_bytes);
+                 statistics_file.snapshot_id, statistics_file.path,
+                 statistics_file.file_size_in_bytes,
+                 statistics_file.file_footer_size_in_bytes);
   std::format_to(std::back_inserter(repr), "blobMetadata=[");
-  for (auto iter = blob_metadata.cbegin(); iter != blob_metadata.cend(); 
++iter) {
-    if (iter != blob_metadata.cbegin()) {
-      std::format_to(std::back_inserter(repr), ",{}", iter->ToString());
+  for (auto iter = statistics_file.blob_metadata.cbegin();
+       iter != statistics_file.blob_metadata.cend(); ++iter) {
+    if (iter != statistics_file.blob_metadata.cbegin()) {
+      std::format_to(std::back_inserter(repr), ",{}", ToString(*iter));
     } else {
-      std::format_to(std::back_inserter(repr), "{}", iter->ToString());
+      std::format_to(std::back_inserter(repr), "{}", ToString(*iter));
     }
   }
   repr += "]]";
   return repr;
 }
 
+std::string ToString(const PartitionStatisticsFile& partition_statistics_file) 
{
+  std::string repr = "PartitionStatisticsFile[";
+  std::format_to(std::back_inserter(repr), 
"snapshotId={},path={},fileSizeInBytes={},",
+                 partition_statistics_file.snapshot_id, 
partition_statistics_file.path,
+                 partition_statistics_file.file_size_in_bytes);
+  return repr;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/statistics_file.h b/src/iceberg/statistics_file.h
index 0de9587..5bdc1c1 100644
--- a/src/iceberg/statistics_file.h
+++ b/src/iceberg/statistics_file.h
@@ -28,12 +28,11 @@
 #include <vector>
 
 #include "iceberg/iceberg_export.h"
-#include "iceberg/util/formattable.h"
 
 namespace iceberg {
 
 /// \brief A metadata about a statistics or indices blob
-struct ICEBERG_EXPORT BlobMetadata : public util::Formattable {
+struct ICEBERG_EXPORT BlobMetadata {
   /// Type of the blob
   std::string type;
   /// ID of the Iceberg table's snapshot the blob was computed from
@@ -47,22 +46,19 @@ struct ICEBERG_EXPORT BlobMetadata : public 
util::Formattable {
 
   /// \brief Compare two BlobMetadatas for equality.
   friend bool operator==(const BlobMetadata& lhs, const BlobMetadata& rhs) {
-    return lhs.Equals(rhs);
+    return lhs.type == rhs.type && lhs.source_snapshot_id == 
rhs.source_snapshot_id &&
+           lhs.source_snapshot_sequence_number == 
rhs.source_snapshot_sequence_number &&
+           lhs.fields == rhs.fields && lhs.properties == rhs.properties;
   }
 
   /// \brief Compare two BlobMetadatas for inequality.
   friend bool operator!=(const BlobMetadata& lhs, const BlobMetadata& rhs) {
     return !(lhs == rhs);
   }
-
-  std::string ToString() const override;
-
- private:
-  bool Equals(const BlobMetadata& other) const;
 };
 
 /// \brief Represents a statistics file in the Puffin format
-struct ICEBERG_EXPORT StatisticsFile : public util::Formattable {
+struct ICEBERG_EXPORT StatisticsFile {
   /// ID of the Iceberg table's snapshot the statistics file is associated with
   int64_t snapshot_id;
   /// Fully qualified path to the file
@@ -76,18 +72,16 @@ struct ICEBERG_EXPORT StatisticsFile : public 
util::Formattable {
 
   /// \brief Compare two StatisticsFiles for equality.
   friend bool operator==(const StatisticsFile& lhs, const StatisticsFile& rhs) 
{
-    return lhs.Equals(rhs);
+    return lhs.snapshot_id == rhs.snapshot_id && lhs.path == rhs.path &&
+           lhs.file_size_in_bytes == rhs.file_size_in_bytes &&
+           lhs.file_footer_size_in_bytes == rhs.file_footer_size_in_bytes &&
+           lhs.blob_metadata == rhs.blob_metadata;
   }
 
   /// \brief Compare two StatisticsFiles for inequality.
   friend bool operator!=(const StatisticsFile& lhs, const StatisticsFile& rhs) 
{
     return !(lhs == rhs);
   }
-
-  std::string ToString() const override;
-
- private:
-  bool Equals(const StatisticsFile& other) const;
 };
 
 /// \brief Represents a partition statistics file
@@ -99,6 +93,29 @@ struct ICEBERG_EXPORT PartitionStatisticsFile {
   std::string path;
   /// The size of the partition statistics file in bytes
   int64_t file_size_in_bytes;
+
+  /// \brief Compare two PartitionStatisticsFiles for equality.
+  friend bool operator==(const PartitionStatisticsFile& lhs,
+                         const PartitionStatisticsFile& rhs) {
+    return lhs.snapshot_id == rhs.snapshot_id && lhs.path == rhs.path &&
+           lhs.file_size_in_bytes == rhs.file_size_in_bytes;
+  }
+
+  /// \brief Compare two PartitionStatisticsFiles for inequality.
+  friend bool operator!=(const PartitionStatisticsFile& lhs,
+                         const PartitionStatisticsFile& rhs) {
+    return !(lhs == rhs);
+  }
 };
 
+/// \brief Returns a string representation of a BlobMetadata
+ICEBERG_EXPORT std::string ToString(const BlobMetadata& blob_metadata);
+
+/// \brief Returns a string representation of a StatisticsFile
+ICEBERG_EXPORT std::string ToString(const StatisticsFile& statistics_file);
+
+/// \brief Returns a string representation of a PartitionStatisticsFile
+ICEBERG_EXPORT std::string ToString(
+    const PartitionStatisticsFile& partition_statistics_file);
+
 }  // namespace iceberg
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index c30ddec..1710214 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -22,17 +22,26 @@
 #include <format>
 #include <string>
 
-#include "iceberg/statistics_file.h"
-
 namespace iceberg {
 
-std::string SnapshotLogEntry::ToString() const {
-  return std::format("SnapshotLogEntry[timestampMillis={},snapshotId={}]", 
timestamp_ms,
-                     snapshot_id);
+std::string ToString(const SnapshotLogEntry& entry) {
+  return std::format("SnapshotLogEntry[timestampMillis={},snapshotId={}]",
+                     entry.timestamp_ms, entry.snapshot_id);
+}
+
+std::string ToString(const MetadataLogEntry& entry) {
+  return std::format("MetadataLogEntry[timestampMillis={},file={}]", 
entry.timestamp_ms,
+                     entry.metadata_file);
+}
+
+Result<TimePointMs> TimePointMsFromUnixMs(int64_t unix_ms) {
+  return TimePointMs{std::chrono::milliseconds(unix_ms)};
 }
 
-std::string MetadataLogEntry::ToString() const {
-  return std::format("MetadataLogEntry[timestampMillis={},file={}]", 
timestamp_ms, file);
+int64_t UnixMsFromTimePointMs(const TimePointMs& time_point_ms) {
+  return std::chrono::duration_cast<std::chrono::milliseconds>(
+             time_point_ms.time_since_epoch())
+      .count();
 }
 
 }  // namespace iceberg
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index e1665dc..2a1afcb 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -29,32 +29,51 @@
 #include <vector>
 
 #include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
 #include "iceberg/type_fwd.h"
-#include "iceberg/util/formattable.h"
 
 namespace iceberg {
 
+/// \brief A time point in milliseconds
 using TimePointMs =
     std::chrono::time_point<std::chrono::system_clock, 
std::chrono::milliseconds>;
 
+/// \brief Returns a TimePointMs from a Unix timestamp in milliseconds
+ICEBERG_EXPORT Result<TimePointMs> TimePointMsFromUnixMs(int64_t unix_ms);
+
+/// \brief Returns a Unix timestamp in milliseconds from a TimePointMs
+ICEBERG_EXPORT int64_t UnixMsFromTimePointMs(const TimePointMs& time_point_ms);
+
 /// \brief Represents a snapshot log entry
-struct ICEBERG_EXPORT SnapshotLogEntry : public util::Formattable {
+struct ICEBERG_EXPORT SnapshotLogEntry {
   /// The timestamp in milliseconds of the change
   TimePointMs timestamp_ms;
   /// ID of the snapshot
   int64_t snapshot_id;
 
-  std::string ToString() const override;
+  friend bool operator==(const SnapshotLogEntry& lhs, const SnapshotLogEntry& 
rhs) {
+    return lhs.timestamp_ms == rhs.timestamp_ms && lhs.snapshot_id == 
rhs.snapshot_id;
+  }
+
+  friend bool operator!=(const SnapshotLogEntry& lhs, const SnapshotLogEntry& 
rhs) {
+    return !(lhs == rhs);
+  }
 };
 
 /// \brief Represents a metadata log entry
-struct ICEBERG_EXPORT MetadataLogEntry : public util::Formattable {
+struct ICEBERG_EXPORT MetadataLogEntry {
   /// The timestamp in milliseconds of the change
   TimePointMs timestamp_ms;
   /// Metadata file location
-  std::string file;
+  std::string metadata_file;
+
+  friend bool operator==(const MetadataLogEntry& lhs, const MetadataLogEntry& 
rhs) {
+    return lhs.timestamp_ms == rhs.timestamp_ms && lhs.metadata_file == 
rhs.metadata_file;
+  }
 
-  std::string ToString() const override;
+  friend bool operator!=(const MetadataLogEntry& lhs, const MetadataLogEntry& 
rhs) {
+    return !(lhs == rhs);
+  }
 };
 
 /// \brief Represents the metadata for an Iceberg table
@@ -63,10 +82,18 @@ struct ICEBERG_EXPORT MetadataLogEntry : public 
util::Formattable {
 /// implementation, missing pieces including: 1) Map<Integer,
 /// Schema|PartitionSpec|SortOrder> 2) List<MetadataUpdate> 3) Map<Long, 
Snapshot> 4)
 /// Map<String, SnapshotRef>
-///
-/// TODO(wgtmac): Implement Equals and ToString once SortOrder and Snapshot are
-/// implemented.
 struct ICEBERG_EXPORT TableMetadata {
+  static constexpr int8_t kDefaultTableFormatVersion = 2;
+  static constexpr int8_t kSupportedTableFormatVersion = 3;
+  static constexpr int8_t kMinFormatVersionRowLineage = 3;
+  static constexpr int32_t kInitialSpecId = 0;
+  static constexpr int32_t kInitialSortOrderId = 1;
+  static constexpr int32_t kInitialSchemaId = 0;
+  static constexpr int64_t kInitialRowId = 0;
+  static constexpr int64_t kInitialSequenceNumber = 0;
+  static constexpr int64_t kInvalidSequenceNumber = -1;
+  static constexpr int64_t kInvalidSnapshotId = -1;
+
   /// An integer version number for the format
   int8_t format_version;
   /// A UUID that identifies the table
@@ -76,7 +103,7 @@ struct ICEBERG_EXPORT TableMetadata {
   /// The table's highest assigned sequence number
   int64_t last_sequence_number;
   /// Timestamp in milliseconds from the unix epoch when the table was last 
updated.
-  int64_t last_updated_ms;
+  TimePointMs last_updated_ms;
   /// The highest assigned column ID for the table
   int32_t last_column_id;
   /// A list of schemas
@@ -106,7 +133,7 @@ struct ICEBERG_EXPORT TableMetadata {
   /// Default sort order id of the table
   int32_t default_sort_order_id;
   /// A map of snapshot references
-  std::unordered_map<std::string, std::string> refs;
+  std::unordered_map<std::string, std::shared_ptr<SnapshotRef>> refs;
   /// A list of table statistics
   std::vector<std::shared_ptr<struct StatisticsFile>> statistics;
   /// A list of partition statistics
@@ -115,4 +142,10 @@ struct ICEBERG_EXPORT TableMetadata {
   int64_t next_row_id;
 };
 
+/// \brief Returns a string representation of a SnapshotLogEntry
+ICEBERG_EXPORT std::string ToString(const SnapshotLogEntry& entry);
+
+/// \brief Returns a string representation of a MetadataLogEntry
+ICEBERG_EXPORT std::string ToString(const MetadataLogEntry& entry);
+
 }  // namespace iceberg
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 44db1c0..ed8cc5f 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -88,24 +88,28 @@ struct TableIdentifier;
 
 class Catalog;
 class LocationProvider;
+class SortField;
+class SortOrder;
 class Table;
 class Transaction;
+class Transform;
+class TransformFunction;
+
+struct PartitionStatisticsFile;
+struct Snapshot;
+struct SnapshotRef;
+struct StatisticsFile;
+struct TableMetadata;
+
+enum class SnapshotRefType;
+enum class TransformType;
 
 /// 
----------------------------------------------------------------------------
 /// TODO: Forward declarations below are not added yet.
 /// 
----------------------------------------------------------------------------
 
 class HistoryEntry;
-class Snapshot;
-struct SnapshotRef;
-enum class SnapshotRefType;
-class SortField;
-class SortOrder;
 class StructLike;
-struct TableMetadata;
-class Transform;
-enum class TransformType;
-class TransformFunction;
 
 class MetadataUpdate;
 class UpdateRequirement;
diff --git a/test/schema_json_test.cc b/test/schema_json_test.cc
index 32c50da..c6549ab 100644
--- a/test/schema_json_test.cc
+++ b/test/schema_json_test.cc
@@ -41,7 +41,7 @@ class TypeJsonTest : public 
::testing::TestWithParam<SchemaJsonParam> {};
 TEST_P(TypeJsonTest, SingleTypeRoundTrip) {
   // To Json
   const auto& param = GetParam();
-  auto json = TypeToJson(*param.type).dump();
+  auto json = ToJson(*param.type).dump();
   ASSERT_EQ(param.json, json);
 
   // From Json
@@ -131,7 +131,7 @@ TEST(SchemaJsonTest, RoundTrip) {
   ASSERT_EQ(field2.type()->type_id(), TypeId::kString);
   ASSERT_TRUE(field2.optional());
 
-  auto dumped_json = SchemaToJson(*schema).dump();
+  auto dumped_json = ToJson(*schema).dump();
   ASSERT_EQ(dumped_json, json);
 }
 

Reply via email to