This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 23ed851 test: add manifest list test cases (#293)
23ed851 is described below
commit 23ed851fc4c749b515befd29a16fcb71611f268d
Author: Gang Wu <[email protected]>
AuthorDate: Mon Nov 10 11:59:14 2025 +0800
test: add manifest list test cases (#293)
- fix schema definition of manifest list & file with versions
- port manifest list test cases from java
- fix various issues found by new cases
Co-authored-by: Junwang Zhao <[email protected]>
---
src/iceberg/avro/avro_writer.cc | 6 +
src/iceberg/manifest_adapter.cc | 59 +--
src/iceberg/manifest_adapter.h | 22 +-
src/iceberg/manifest_entry.cc | 2 +-
src/iceberg/manifest_entry.h | 11 +
src/iceberg/manifest_list.cc | 25 +-
src/iceberg/manifest_list.h | 2 +-
src/iceberg/manifest_reader.cc | 17 +-
src/iceberg/manifest_writer.cc | 21 +-
src/iceberg/manifest_writer.h | 15 +-
src/iceberg/schema_field.h | 12 +
src/iceberg/test/CMakeLists.txt | 1 +
src/iceberg/test/manifest_list_versions_test.cc | 487 ++++++++++++++++++++++++
src/iceberg/test/manifest_reader_writer_test.cc | 5 +-
src/iceberg/type_fwd.h | 1 +
src/iceberg/v1_metadata.cc | 137 +++----
src/iceberg/v1_metadata.h | 11 +-
src/iceberg/v2_metadata.cc | 140 ++++---
src/iceberg/v2_metadata.h | 11 +-
src/iceberg/v3_metadata.cc | 171 +++++----
src/iceberg/v3_metadata.h | 18 +-
21 files changed, 888 insertions(+), 286 deletions(-)
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index e76149b..4e86a68 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -55,6 +55,12 @@ Result<std::unique_ptr<AvroOutputStream>>
CreateOutputStream(const WriterOptions
class AvroWriter::Impl {
public:
+ ~Impl() {
+ if (arrow_schema_.release != nullptr) {
+ ArrowSchemaRelease(&arrow_schema_);
+ }
+ }
+
Status Open(const WriterOptions& options) {
write_schema_ = options.schema;
diff --git a/src/iceberg/manifest_adapter.cc b/src/iceberg/manifest_adapter.cc
index b568077..c0e267d 100644
--- a/src/iceberg/manifest_adapter.cc
+++ b/src/iceberg/manifest_adapter.cc
@@ -139,6 +139,14 @@ Result<ArrowArray*> ManifestAdapter::FinishAppending() {
return &array_;
}
+ManifestEntryAdapter::ManifestEntryAdapter(std::shared_ptr<PartitionSpec>
partition_spec,
+ ManifestContent content)
+ : partition_spec_(std::move(partition_spec)), content_(content) {
+ if (!partition_spec_) {
+ partition_spec_ = PartitionSpec::Unpartitioned();
+ }
+}
+
ManifestEntryAdapter::~ManifestEntryAdapter() {
if (array_.release != nullptr) {
ArrowArrayRelease(&array_);
@@ -148,14 +156,6 @@ ManifestEntryAdapter::~ManifestEntryAdapter() {
}
}
-Result<std::shared_ptr<StructType>>
ManifestEntryAdapter::GetManifestEntryType() {
- if (partition_spec_ == nullptr) [[unlikely]] {
- return ManifestEntry::TypeFromPartitionType(nullptr);
- }
- ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
partition_spec_->PartitionType());
- return ManifestEntry::TypeFromPartitionType(std::move(partition_type));
-}
-
Status ManifestEntryAdapter::AppendPartitionValues(
ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
const std::vector<Literal>& partition_values) {
@@ -436,37 +436,6 @@ Status ManifestEntryAdapter::AppendInternal(const
ManifestEntry& entry) {
return {};
}
-Status ManifestEntryAdapter::InitSchema(const std::unordered_set<int32_t>&
fields_ids) {
- ICEBERG_ASSIGN_OR_RAISE(auto manifest_entry_type, GetManifestEntryType())
- auto fields_span = manifest_entry_type->fields();
- std::vector<SchemaField> fields;
- // TODO(xiao.dong) Make this a common function to recursively handle
- // all nested fields in the schema
- for (const auto& field : fields_span) {
- if (field.field_id() == 2) {
- // handle data_file field
- auto data_file_struct =
internal::checked_pointer_cast<StructType>(field.type());
- std::vector<SchemaField> data_file_fields;
- for (const auto& data_file_field : data_file_struct->fields()) {
- if (fields_ids.contains(data_file_field.field_id())) {
- data_file_fields.emplace_back(data_file_field);
- }
- }
- auto type = std::make_shared<StructType>(data_file_fields);
- auto data_file_field = SchemaField::MakeRequired(
- field.field_id(), std::string(field.name()), std::move(type));
- fields.emplace_back(std::move(data_file_field));
- } else {
- if (fields_ids.contains(field.field_id())) {
- fields.emplace_back(field);
- }
- }
- }
- manifest_schema_ = std::make_shared<Schema>(fields);
- ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_schema_, &schema_));
- return {};
-}
-
ManifestFileAdapter::~ManifestFileAdapter() {
if (array_.release != nullptr) {
ArrowArrayRelease(&array_);
@@ -671,16 +640,4 @@ Status ManifestFileAdapter::AppendInternal(const
ManifestFile& file) {
return {};
}
-Status ManifestFileAdapter::InitSchema(const std::unordered_set<int32_t>&
fields_ids) {
- std::vector<SchemaField> fields;
- for (const auto& field : ManifestFile::Type().fields()) {
- if (fields_ids.contains(field.field_id())) {
- fields.emplace_back(field);
- }
- }
- manifest_list_schema_ = std::make_shared<Schema>(fields);
- ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_list_schema_, &schema_));
- return {};
-}
-
} // namespace iceberg
diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h
index 50f3b0c..8750506 100644
--- a/src/iceberg/manifest_adapter.h
+++ b/src/iceberg/manifest_adapter.h
@@ -61,22 +61,18 @@ class ICEBERG_EXPORT ManifestAdapter {
/// Implemented by different versions with version-specific schemas.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
- : partition_spec_(std::move(partition_spec)) {}
+ ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec,
+ ManifestContent content);
+
~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
- protected:
- virtual Result<std::shared_ptr<StructType>> GetManifestEntryType();
+ ManifestContent content() const { return content_; }
- /// \brief Initialize version-specific schema.
- ///
- /// \param fields_ids Field IDs to include in the manifest schema. The
schema will be
- /// initialized to include only the fields with these IDs.
- Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ protected:
Status AppendInternal(const ManifestEntry& entry);
Status AppendDataFile(ArrowArray* array,
const std::shared_ptr<StructType>& data_file_type,
@@ -97,6 +93,7 @@ class ICEBERG_EXPORT ManifestEntryAdapter : public
ManifestAdapter {
protected:
std::shared_ptr<PartitionSpec> partition_spec_;
std::shared_ptr<Schema> manifest_schema_;
+ const ManifestContent content_;
};
/// \brief Adapter for appending a list of `ManifestFile`s to an `ArrowArray`.
@@ -110,12 +107,9 @@ class ICEBERG_EXPORT ManifestFileAdapter : public
ManifestAdapter {
const std::shared_ptr<Schema>& schema() const { return
manifest_list_schema_; }
+ virtual std::optional<int64_t> next_row_id() const { return std::nullopt; }
+
protected:
- /// \brief Initialize version-specific schema.
- ///
- /// \param fields_ids Field IDs to include in the manifest list schema. The
schema will
- /// be initialized to include only the fields with these IDs.
- Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
Status AppendInternal(const ManifestFile& file);
static Status AppendPartitionSummary(
ArrowArray* array, const std::shared_ptr<ListType>& summary_type,
diff --git a/src/iceberg/manifest_entry.cc b/src/iceberg/manifest_entry.cc
index 5a963b5..fee845d 100644
--- a/src/iceberg/manifest_entry.cc
+++ b/src/iceberg/manifest_entry.cc
@@ -38,7 +38,7 @@ bool ManifestEntry::operator==(const ManifestEntry& other)
const {
std::shared_ptr<StructType> DataFile::Type(std::shared_ptr<StructType>
partition_type) {
if (!partition_type) {
- partition_type = PartitionSpec::Unpartitioned()->schema();
+ partition_type = std::make_shared<StructType>(std::vector<SchemaField>{});
}
return std::make_shared<StructType>(std::vector<SchemaField>{
kContent,
diff --git a/src/iceberg/manifest_entry.h b/src/iceberg/manifest_entry.h
index 2b9987b..c67a63e 100644
--- a/src/iceberg/manifest_entry.h
+++ b/src/iceberg/manifest_entry.h
@@ -57,6 +57,15 @@ ICEBERG_EXPORT constexpr Result<ManifestStatus>
ManifestStatusFromInt(
}
}
+enum class ManifestContent {
+ kData = 0,
+ kDeletes = 1,
+};
+
+ICEBERG_EXPORT constexpr std::string_view ToString(ManifestContent content)
noexcept;
+ICEBERG_EXPORT constexpr Result<ManifestContent> ManifestContentFromString(
+ std::string_view str) noexcept;
+
/// \brief DataFile carries data file path, partition tuple, metrics, ...
struct ICEBERG_EXPORT DataFile {
/// \brief Content of a data file
@@ -185,6 +194,8 @@ struct ICEBERG_EXPORT DataFile {
101, "file_format", iceberg::string(), "File format name: avro, orc, or
parquet");
inline static const int32_t kPartitionFieldId = 102;
inline static const std::string kPartitionField = "partition";
+ inline static const std::string kPartitionDoc =
+ "Partition data tuple, schema based on the partition spec";
inline static const SchemaField kRecordCount = SchemaField::MakeRequired(
103, "record_count", iceberg::int64(), "Number of records in the file");
inline static const SchemaField kFileSize = SchemaField::MakeRequired(
diff --git a/src/iceberg/manifest_list.cc b/src/iceberg/manifest_list.cc
index b990760..853a018 100644
--- a/src/iceberg/manifest_list.cc
+++ b/src/iceberg/manifest_list.cc
@@ -33,12 +33,25 @@ const StructType& PartitionFieldSummary::Type() {
return kInstance;
}
-const StructType& ManifestFile::Type() {
- static const StructType kInstance(
- {kManifestPath, kManifestLength, kPartitionSpecId, kContent,
kSequenceNumber,
- kMinSequenceNumber, kAddedSnapshotId, kAddedFilesCount,
kExistingFilesCount,
- kDeletedFilesCount, kAddedRowsCount, kExistingRowsCount,
kDeletedRowsCount,
- kPartitions, kKeyMetadata, kFirstRowId});
+const std::shared_ptr<Schema>& ManifestFile::Type() {
+ static const auto kInstance =
std::make_shared<Schema>(std::vector<SchemaField>{
+ kManifestPath,
+ kManifestLength,
+ kPartitionSpecId,
+ kContent,
+ kSequenceNumber,
+ kMinSequenceNumber,
+ kAddedSnapshotId,
+ kAddedFilesCount,
+ kExistingFilesCount,
+ kDeletedFilesCount,
+ kAddedRowsCount,
+ kExistingRowsCount,
+ kDeletedRowsCount,
+ kPartitions,
+ kKeyMetadata,
+ kFirstRowId,
+ });
return kInstance;
}
diff --git a/src/iceberg/manifest_list.h b/src/iceberg/manifest_list.h
index d6e7f1d..17dc28c 100644
--- a/src/iceberg/manifest_list.h
+++ b/src/iceberg/manifest_list.h
@@ -197,7 +197,7 @@ struct ICEBERG_EXPORT ManifestFile {
bool operator==(const ManifestFile& other) const = default;
- static const StructType& Type();
+ static const std::shared_ptr<Schema>& Type();
};
/// Snapshots are embedded in table metadata, but the list of manifests for a
snapshot are
diff --git a/src/iceberg/manifest_reader.cc b/src/iceberg/manifest_reader.cc
index 1e90852..60c15aa 100644
--- a/src/iceberg/manifest_reader.cc
+++ b/src/iceberg/manifest_reader.cc
@@ -71,14 +71,15 @@ Result<std::unique_ptr<ManifestReader>>
ManifestReader::Make(
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
- std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
- ManifestFile::Type().fields().end());
- auto schema = std::make_shared<Schema>(fields);
- ICEBERG_ASSIGN_OR_RAISE(auto reader, ReaderFactoryRegistry::Open(
- FileFormatType::kAvro,
- {.path =
std::string(manifest_list_location),
- .io = std::move(file_io),
- .projection = schema}));
+ std::shared_ptr<Schema> schema = ManifestFile::Type();
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto reader,
+ ReaderFactoryRegistry::Open(FileFormatType::kAvro,
+ {
+ .path =
std::string(manifest_list_location),
+ .io = std::move(file_io),
+ .projection = schema,
+ }));
return std::make_unique<ManifestListReaderImpl>(std::move(reader),
std::move(schema));
}
diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc
index a27fb45..8b295e1 100644
--- a/src/iceberg/manifest_writer.cc
+++ b/src/iceberg/manifest_writer.cc
@@ -53,6 +53,8 @@ Status ManifestWriter::Close() {
return writer_->Close();
}
+ManifestContent ManifestWriter::content() const { return adapter_->content(); }
+
Result<std::unique_ptr<Writer>> OpenFileWriter(
std::string_view location, std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> file_io,
@@ -83,9 +85,10 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeV1Writer(
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
- std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec>
partition_spec) {
- auto adapter =
- std::make_unique<ManifestEntryAdapterV2>(snapshot_id,
std::move(partition_spec));
+ std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec>
partition_spec,
+ ManifestContent content) {
+ auto adapter = std::make_unique<ManifestEntryAdapterV2>(
+ snapshot_id, std::move(partition_spec), content);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
@@ -99,9 +102,9 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeV2Writer(
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
- std::shared_ptr<PartitionSpec> partition_spec) {
- auto adapter = std::make_unique<ManifestEntryAdapterV3>(snapshot_id,
first_row_id,
-
std::move(partition_spec));
+ std::shared_ptr<PartitionSpec> partition_spec, ManifestContent content) {
+ auto adapter = std::make_unique<ManifestEntryAdapterV3>(
+ snapshot_id, first_row_id, std::move(partition_spec), content);
ICEBERG_RETURN_UNEXPECTED(adapter->Init());
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
@@ -136,6 +139,10 @@ Status ManifestListWriter::Close() {
return writer_->Close();
}
+std::optional<int64_t> ManifestListWriter::next_row_id() const {
+ return adapter_->next_row_id();
+}
+
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
@@ -169,7 +176,7 @@ Result<std::unique_ptr<ManifestListWriter>>
ManifestListWriter::MakeV2Writer(
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV3Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
- int64_t sequence_number, std::optional<int64_t> first_row_id,
+ int64_t sequence_number, int64_t first_row_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
auto adapter = std::make_unique<ManifestFileAdapterV3>(snapshot_id,
parent_snapshot_id,
sequence_number,
first_row_id);
diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h
index 69e1912..be6c831 100644
--- a/src/iceberg/manifest_writer.h
+++ b/src/iceberg/manifest_writer.h
@@ -54,6 +54,9 @@ class ICEBERG_EXPORT ManifestWriter {
/// \brief Close writer and flush to storage.
Status Close();
+ /// \brief Get the content of the manifest.
+ ManifestContent content() const;
+
/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
/// \param manifest_location Path to the manifest file.
@@ -69,10 +72,12 @@ class ICEBERG_EXPORT ManifestWriter {
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_spec Partition spec for the manifest.
+ /// \param content Content of the manifest.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV2Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
- std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec>
partition_spec);
+ std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec>
partition_spec,
+ ManifestContent content);
/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
@@ -80,11 +85,12 @@ class ICEBERG_EXPORT ManifestWriter {
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
/// \param partition_spec Partition spec for the manifest.
+ /// \param content Content of the manifest.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV3Writer(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
- std::shared_ptr<PartitionSpec> partition_spec);
+ std::shared_ptr<PartitionSpec> partition_spec, ManifestContent content);
private:
static constexpr int64_t kBatchSize = 1024;
@@ -114,6 +120,9 @@ class ICEBERG_EXPORT ManifestListWriter {
/// \brief Close writer and flush to storage.
Status Close();
+ /// \brief Get the next row id to assign.
+ std::optional<int64_t> next_row_id() const;
+
/// \brief Creates a writer for the v1 manifest list.
/// \param snapshot_id ID of the snapshot.
/// \param parent_snapshot_id ID of the parent snapshot.
@@ -146,7 +155,7 @@ class ICEBERG_EXPORT ManifestListWriter {
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestListWriter>> MakeV3Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
- int64_t sequence_number, std::optional<int64_t> first_row_id,
+ int64_t sequence_number, int64_t first_row_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO>
file_io);
private:
diff --git a/src/iceberg/schema_field.h b/src/iceberg/schema_field.h
index 4190dba..c7c826d 100644
--- a/src/iceberg/schema_field.h
+++ b/src/iceberg/schema_field.h
@@ -79,6 +79,18 @@ class ICEBERG_EXPORT SchemaField : public
iceberg::util::Formattable {
return lhs.Equals(rhs);
}
+ SchemaField AsRequired() const {
+ auto copy = *this;
+ copy.optional_ = false;
+ return copy;
+ }
+
+ SchemaField AsOptional() const {
+ auto copy = *this;
+ copy.optional_ = true;
+ return copy;
+ }
+
private:
/// \brief Compare two fields for equality.
[[nodiscard]] bool Equals(const SchemaField& other) const;
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index fd8cbc9..544e92f 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -121,6 +121,7 @@ if(ICEBERG_BUILD_BUNDLE)
avro_schema_test.cc
avro_stream_test.cc
manifest_list_reader_writer_test.cc
+ manifest_list_versions_test.cc
manifest_reader_writer_test.cc
test_common.cc)
diff --git a/src/iceberg/test/manifest_list_versions_test.cc
b/src/iceberg/test/manifest_list_versions_test.cc
new file mode 100644
index 0000000..1f329ba
--- /dev/null
+++ b/src/iceberg/test/manifest_list_versions_test.cc
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <optional>
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/record_batch.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/manifest_reader.h"
+#include "iceberg/manifest_writer.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/v1_metadata.h"
+
+namespace iceberg {
+
+constexpr int kRowLineageFormatVersion = 3;
+constexpr const char* kPath = "s3://bucket/table/m1.avro";
+constexpr int64_t kLength = 1024L;
+constexpr int32_t kSpecId = 1;
+constexpr int64_t kSeqNum = 34L;
+constexpr int64_t kMinSeqNum = 10L;
+constexpr int64_t kSnapshotId = 987134631982734L;
+constexpr int32_t kAddedFiles = 2;
+constexpr int64_t kAddedRows = 5292L;
+constexpr int32_t kExistingFiles = 343;
+constexpr int64_t kExistingRows = 857273L;
+constexpr int32_t kDeletedFiles = 1;
+constexpr int64_t kDeletedRows = 22910L;
+constexpr int64_t kFirstRowId = 100L;
+constexpr int64_t kSnapshotFirstRowId = 130L;
+
+const static auto kTestManifest = ManifestFile{
+ .manifest_path = kPath,
+ .manifest_length = kLength,
+ .partition_spec_id = kSpecId,
+ .content = ManifestFile::Content::kData,
+ .sequence_number = kSeqNum,
+ .min_sequence_number = kMinSeqNum,
+ .added_snapshot_id = kSnapshotId,
+ .added_files_count = kAddedFiles,
+ .existing_files_count = kExistingFiles,
+ .deleted_files_count = kDeletedFiles,
+ .added_rows_count = kAddedRows,
+ .existing_rows_count = kExistingRows,
+ .deleted_rows_count = kDeletedRows,
+ .partitions = {},
+ .key_metadata = {},
+ .first_row_id = kFirstRowId,
+};
+
+const static auto kDeleteManifest = ManifestFile{
+ .manifest_path = kPath,
+ .manifest_length = kLength,
+ .partition_spec_id = kSpecId,
+ .content = ManifestFile::Content::kDeletes,
+ .sequence_number = kSeqNum,
+ .min_sequence_number = kMinSeqNum,
+ .added_snapshot_id = kSnapshotId,
+ .added_files_count = kAddedFiles,
+ .existing_files_count = kExistingFiles,
+ .deleted_files_count = kDeletedFiles,
+ .added_rows_count = kAddedRows,
+ .existing_rows_count = kExistingRows,
+ .deleted_rows_count = kDeletedRows,
+ .first_row_id = std::nullopt,
+};
+
+class TestManifestListVersions : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ avro::RegisterAll();
+ file_io_ = iceberg::arrow::MakeMockFileIO();
+ }
+
+ static std::string CreateManifestListPath() {
+ return std::format("manifest-list-{}.avro",
+
std::chrono::system_clock::now().time_since_epoch().count());
+ }
+
+ std::string WriteManifestList(int format_version, int64_t
expected_next_row_id,
+ const std::vector<ManifestFile>& manifests)
const {
+ const std::string manifest_list_path = CreateManifestListPath();
+ constexpr int64_t kParentSnapshotId = kSnapshotId - 1;
+
+ Result<std::unique_ptr<ManifestListWriter>> writer_result =
+ NotSupported("Format version: {}", format_version);
+
+ if (format_version == 1) {
+ writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId,
kParentSnapshotId,
+ manifest_list_path,
file_io_);
+ } else if (format_version == 2) {
+ writer_result = ManifestListWriter::MakeV2Writer(
+ kSnapshotId, kParentSnapshotId, kSeqNum, manifest_list_path,
file_io_);
+ } else if (format_version == 3) {
+ writer_result = ManifestListWriter::MakeV3Writer(kSnapshotId,
kParentSnapshotId,
+ kSeqNum,
kSnapshotFirstRowId,
+ manifest_list_path,
file_io_);
+ }
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ EXPECT_THAT(writer->AddAll(manifests), IsOk());
+ EXPECT_THAT(writer->Close(), IsOk());
+
+ if (format_version >= kRowLineageFormatVersion) {
+ EXPECT_EQ(writer->next_row_id(),
std::make_optional(expected_next_row_id));
+ } else {
+ EXPECT_FALSE(writer->next_row_id().has_value());
+ }
+
+ return manifest_list_path;
+ }
+
+ ManifestFile WriteAndReadManifestList(int format_version) const {
+ return ReadManifestList(
+ WriteManifestList(format_version, kSnapshotFirstRowId,
{kTestManifest}));
+ }
+
+ ManifestFile ReadManifestList(const std::string& manifest_list_path) const {
+ auto reader_result = ManifestListReader::Make(manifest_list_path,
file_io_);
+ EXPECT_THAT(reader_result, IsOk());
+
+ auto reader = std::move(reader_result.value());
+ auto files_result = reader->Files();
+ EXPECT_THAT(files_result, IsOk());
+
+ auto manifests = files_result.value();
+ EXPECT_EQ(manifests.size(), 1);
+
+ return manifests[0];
+ }
+
+ std::vector<ManifestFile> ReadAllManifests(
+ const std::string& manifest_list_path) const {
+ auto reader_result = ManifestListReader::Make(manifest_list_path,
file_io_);
+ EXPECT_THAT(reader_result, IsOk());
+
+ auto reader = std::move(reader_result.value());
+ auto files_result = reader->Files();
+ EXPECT_THAT(files_result, IsOk());
+
+ return files_result.value();
+ }
+
+ void ReadAvro(const std::string& path, const std::shared_ptr<Schema>& schema,
+ const std::string& expected_json) const {
+ auto reader_result = ReaderFactoryRegistry::Open(
+ FileFormatType::kAvro, {.path = path, .io = file_io_, .projection =
schema});
+ EXPECT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ auto arrow_schema_result = reader->Schema();
+ EXPECT_THAT(arrow_schema_result, IsOk());
+ auto arrow_c_schema = std::move(arrow_schema_result.value());
+ auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+ auto expected_array =
+ ::arrow::json::ArrayFromJSONString(arrow_schema,
expected_json).ValueOrDie();
+
+ auto batch_result = reader->Next();
+ EXPECT_THAT(batch_result, IsOk());
+ EXPECT_TRUE(batch_result.value().has_value());
+ auto arrow_c_batch = std::move(batch_result.value().value());
+
+ auto arrow_batch_result =
+ ::arrow::ImportArray(&arrow_c_batch, std::move(arrow_schema));
+ auto array = arrow_batch_result.ValueOrDie();
+ EXPECT_TRUE(array != nullptr);
+ EXPECT_TRUE(expected_array->Equals(*array));
+ }
+
+ std::shared_ptr<FileIO> file_io_;
+};
+
+TEST_F(TestManifestListVersions, TestV1WriteDeleteManifest) {
+ const std::string manifest_list_path = CreateManifestListPath();
+
+ auto writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId,
kSnapshotId - 1,
+ manifest_list_path,
file_io_);
+ EXPECT_THAT(writer_result, IsOk());
+
+ auto writer = std::move(writer_result.value());
+ auto status = writer->Add(kDeleteManifest);
+
+ EXPECT_THAT(status, IsError(ErrorKind::kInvalidManifestList));
+ EXPECT_THAT(status, HasErrorMessage("Cannot store delete manifests in a v1
table"));
+}
+
+TEST_F(TestManifestListVersions, TestV1Write) {
+ auto manifest = WriteAndReadManifestList(/*format_version=*/1);
+
+ // V3 fields are not written and are defaulted
+ EXPECT_FALSE(manifest.first_row_id.has_value());
+
+ // V2 fields are not written and are defaulted
+ EXPECT_EQ(manifest.sequence_number, 0);
+ EXPECT_EQ(manifest.min_sequence_number, 0);
+
+ // V1 fields are read correctly
+ EXPECT_EQ(manifest.manifest_path, kPath);
+ EXPECT_EQ(manifest.manifest_length, kLength);
+ EXPECT_EQ(manifest.partition_spec_id, kSpecId);
+ EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+ EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
+ EXPECT_EQ(manifest.added_files_count, kAddedFiles);
+ EXPECT_EQ(manifest.existing_files_count, kExistingFiles);
+ EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles);
+ EXPECT_EQ(manifest.added_rows_count, kAddedRows);
+ EXPECT_EQ(manifest.existing_rows_count, kExistingRows);
+ EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows);
+}
+
+TEST_F(TestManifestListVersions, TestV2Write) {
+ auto manifest = WriteAndReadManifestList(2);
+
+ // V3 fields are not written and are defaulted
+ EXPECT_FALSE(manifest.first_row_id.has_value());
+
+ // All V2 fields should be read correctly
+ EXPECT_EQ(manifest.manifest_path, kPath);
+ EXPECT_EQ(manifest.manifest_length, kLength);
+ EXPECT_EQ(manifest.partition_spec_id, kSpecId);
+ EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+ EXPECT_EQ(manifest.sequence_number, kSeqNum);
+ EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum);
+ EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
+ EXPECT_EQ(manifest.added_files_count, kAddedFiles);
+ EXPECT_EQ(manifest.added_rows_count, kAddedRows);
+ EXPECT_EQ(manifest.existing_files_count, kExistingFiles);
+ EXPECT_EQ(manifest.existing_rows_count, kExistingRows);
+ EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles);
+ EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows);
+}
+
+TEST_F(TestManifestListVersions, TestV3Write) {
+ auto manifest = WriteAndReadManifestList(/*format_version=*/3);
+
+ // All V3 fields should be read correctly
+ EXPECT_EQ(manifest.manifest_path, kPath);
+ EXPECT_EQ(manifest.manifest_length, kLength);
+ EXPECT_EQ(manifest.partition_spec_id, kSpecId);
+ EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+ EXPECT_EQ(manifest.sequence_number, kSeqNum);
+ EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum);
+ EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
+ EXPECT_EQ(manifest.added_files_count, kAddedFiles);
+ EXPECT_EQ(manifest.added_rows_count, kAddedRows);
+ EXPECT_EQ(manifest.existing_files_count, kExistingFiles);
+ EXPECT_EQ(manifest.existing_rows_count, kExistingRows);
+ EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles);
+ EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows);
+ EXPECT_TRUE(manifest.first_row_id.has_value());
+ EXPECT_EQ(manifest.first_row_id.value(), kFirstRowId);
+}
+
+TEST_F(TestManifestListVersions, TestV3WriteFirstRowIdAssignment) {
+ ManifestFile missing_first_row_id = kTestManifest;
+ missing_first_row_id.first_row_id = std::nullopt;
+
+ constexpr int64_t kExpectedNextRowId = kSnapshotFirstRowId + kAddedRows +
kExistingRows;
+ auto manifest_list_path =
+ WriteManifestList(/*format_version=*/3, kExpectedNextRowId,
{missing_first_row_id});
+
+ auto manifest = ReadManifestList(manifest_list_path);
+ EXPECT_EQ(manifest.manifest_path, kPath);
+ EXPECT_EQ(manifest.manifest_length, kLength);
+ EXPECT_EQ(manifest.partition_spec_id, kSpecId);
+ EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+ EXPECT_EQ(manifest.sequence_number, kSeqNum);
+ EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum);
+ EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
+ EXPECT_EQ(manifest.added_files_count, kAddedFiles);
+ EXPECT_EQ(manifest.added_rows_count, kAddedRows);
+ EXPECT_EQ(manifest.existing_files_count, kExistingFiles);
+ EXPECT_EQ(manifest.existing_rows_count, kExistingRows);
+ EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles);
+ EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows);
+ EXPECT_EQ(manifest.first_row_id, std::make_optional(kSnapshotFirstRowId));
+}
+
+TEST_F(TestManifestListVersions, TestV3WriteMixedRowIdAssignment) {
+ ManifestFile missing_first_row_id = kTestManifest;
+ missing_first_row_id.first_row_id = std::nullopt;
+
+ constexpr int64_t kExpectedNextRowId =
+ kSnapshotFirstRowId + 2 * (kAddedRows + kExistingRows);
+
+ auto manifest_list_path = WriteManifestList(
+ 3, kExpectedNextRowId, {missing_first_row_id, kTestManifest,
missing_first_row_id});
+
+ auto manifests = ReadAllManifests(manifest_list_path);
+ EXPECT_EQ(manifests.size(), 3);
+
+ // all v2 fields should be read correctly
+ for (const auto& manifest : manifests) {
+ EXPECT_EQ(manifest.manifest_path, kPath);
+ EXPECT_EQ(manifest.manifest_length, kLength);
+ EXPECT_EQ(manifest.partition_spec_id, kSpecId);
+ EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+ EXPECT_EQ(manifest.sequence_number, kSeqNum);
+ EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum);
+ EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
+ EXPECT_EQ(manifest.added_files_count, kAddedFiles);
+ EXPECT_EQ(manifest.added_rows_count, kAddedRows);
+ EXPECT_EQ(manifest.existing_files_count, kExistingFiles);
+ EXPECT_EQ(manifest.existing_rows_count, kExistingRows);
+ EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles);
+ EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows);
+ }
+
+ EXPECT_EQ(manifests[0].first_row_id,
std::make_optional(kSnapshotFirstRowId));
+ EXPECT_EQ(manifests[1].first_row_id, kTestManifest.first_row_id);
+ EXPECT_EQ(manifests[2].first_row_id,
+ std::make_optional(kSnapshotFirstRowId + kAddedRows +
kExistingRows));
+}
+
+TEST_F(TestManifestListVersions, TestV1ForwardCompatibility) {
+ std::string manifest_list_path =
+ WriteManifestList(/*format_version=*/1, kSnapshotFirstRowId,
{kTestManifest});
+ std::string expected_array_json = R"([
+ ["s3://bucket/table/m1.avro", 1024, 1, 987134631982734, 2, 343, 1, [],
5292, 857273, 22910, null]
+ ])";
+ ReadAvro(manifest_list_path, ManifestFileAdapterV1::kManifestListSchema,
+ expected_array_json);
+}
+
+TEST_F(TestManifestListVersions, TestV2ForwardCompatibility) {
+ // V2 manifest list files can be read by V1 readers, but the sequence
numbers and
+ // content will be ignored.
+ std::string manifest_list_path =
+ WriteManifestList(/*format_version=*/2, kSnapshotFirstRowId,
{kTestManifest});
+ std::string expected_array_json = R"([
+ ["s3://bucket/table/m1.avro", 1024, 1, 987134631982734, 2, 343, 1, [],
5292, 857273, 22910, null]
+ ])";
+ ReadAvro(manifest_list_path, ManifestFileAdapterV1::kManifestListSchema,
+ expected_array_json);
+}
+
+TEST_F(TestManifestListVersions, TestManifestsWithoutRowStats) {
+ // Create a schema without row stats columns to simulate an old manifest
list file
+ auto schema_without_stats =
std::make_shared<Schema>(std::vector<SchemaField>{
+ ManifestFile::kManifestPath,
+ ManifestFile::kManifestLength,
+ ManifestFile::kPartitionSpecId,
+ ManifestFile::kAddedSnapshotId,
+ ManifestFile::kAddedFilesCount,
+ ManifestFile::kExistingFilesCount,
+ ManifestFile::kDeletedFilesCount,
+ ManifestFile::kPartitions,
+ });
+
+ ArrowSchema arrow_c_schema;
+ EXPECT_THAT(ToArrowSchema(*schema_without_stats, &arrow_c_schema), IsOk());
+ auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+ std::string json_data = R"([["path/to/manifest.avro", 1024, 1, 100, 2, 3, 4,
null]])";
+ auto array = ::arrow::json::ArrayFromJSONString(arrow_schema,
json_data).ValueOrDie();
+ ArrowArray arrow_array;
+ EXPECT_TRUE(::arrow::ExportArray(*array, &arrow_array).ok());
+
+ std::string manifest_list_path = CreateManifestListPath();
+ auto writer_result = WriterFactoryRegistry::Open(
+ FileFormatType::kAvro,
+ {.path = manifest_list_path, .schema = schema_without_stats, .io =
file_io_});
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+ EXPECT_THAT(writer->Write(&arrow_array), IsOk());
+ EXPECT_THAT(writer->Close(), IsOk());
+
+ // Read back and verify
+ auto manifest = ReadManifestList(manifest_list_path);
+
+ EXPECT_EQ(manifest.manifest_path, "path/to/manifest.avro");
+ EXPECT_EQ(manifest.manifest_length, 1024L);
+ EXPECT_EQ(manifest.partition_spec_id, 1);
+ EXPECT_EQ(manifest.added_snapshot_id, 100L);
+
+ EXPECT_TRUE(manifest.has_added_files());
+ EXPECT_EQ(manifest.added_files_count, 2);
+ EXPECT_FALSE(manifest.added_rows_count.has_value());
+
+ EXPECT_TRUE(manifest.has_existing_files());
+ EXPECT_EQ(manifest.existing_files_count, 3);
+ EXPECT_FALSE(manifest.existing_rows_count.has_value());
+
+ EXPECT_TRUE(manifest.has_deleted_files());
+ EXPECT_EQ(manifest.deleted_files_count, 4);
+ EXPECT_FALSE(manifest.deleted_rows_count.has_value());
+
+ EXPECT_FALSE(manifest.first_row_id.has_value());
+}
+
+TEST_F(TestManifestListVersions, TestManifestsPartitionSummary) {
+ auto first_summary_lower_bound = Literal::Int(10).Serialize().value();
+ auto first_summary_upper_bound = Literal::Int(100).Serialize().value();
+ auto second_summary_lower_bound = Literal::Int(20).Serialize().value();
+ auto second_summary_upper_bound = Literal::Int(200).Serialize().value();
+
+ std::vector<PartitionFieldSummary> partition_summaries{
+ PartitionFieldSummary{
+ .contains_null = false,
+ .contains_nan = std::nullopt,
+ .lower_bound = first_summary_lower_bound,
+ .upper_bound = first_summary_upper_bound,
+ },
+ PartitionFieldSummary{
+ .contains_null = true,
+ .contains_nan = false,
+ .lower_bound = second_summary_lower_bound,
+ .upper_bound = second_summary_upper_bound,
+ },
+ };
+
+ ManifestFile manifest{
+ .manifest_path = kPath,
+ .manifest_length = kLength,
+ .partition_spec_id = kSpecId,
+ .content = ManifestFile::Content::kData,
+ .sequence_number = kSeqNum,
+ .min_sequence_number = kMinSeqNum,
+ .added_snapshot_id = kSnapshotId,
+ .added_files_count = kAddedFiles,
+ .existing_files_count = kExistingFiles,
+ .deleted_files_count = kDeletedFiles,
+ .added_rows_count = kAddedRows,
+ .existing_rows_count = kExistingRows,
+ .deleted_rows_count = kDeletedRows,
+ .partitions = partition_summaries,
+ .key_metadata = {},
+ .first_row_id = std::nullopt,
+ };
+
+ // Test for all format versions
+ for (int format_version = 1; format_version <= 3; ++format_version) {
+ int64_t expected_next_row_id = kSnapshotFirstRowId +
+ manifest.added_rows_count.value() +
+ manifest.existing_rows_count.value();
+
+ auto manifest_list_path =
+ WriteManifestList(format_version, expected_next_row_id, {manifest});
+
+ auto returned_manifest = ReadManifestList(manifest_list_path);
+ EXPECT_EQ(returned_manifest.partitions.size(), 2);
+
+ const auto& first = returned_manifest.partitions[0];
+ EXPECT_FALSE(first.contains_null);
+ EXPECT_FALSE(first.contains_nan.has_value());
+ EXPECT_EQ(first.lower_bound, first_summary_lower_bound);
+ EXPECT_EQ(first.upper_bound, first_summary_upper_bound);
+
+ const auto& second = returned_manifest.partitions[1];
+ EXPECT_TRUE(second.contains_null);
+ EXPECT_TRUE(second.contains_nan.has_value());
+ EXPECT_FALSE(second.contains_nan.value());
+ EXPECT_EQ(second.lower_bound, second_summary_lower_bound);
+ EXPECT_EQ(second.upper_bound, second_summary_upper_bound);
+ }
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/test/manifest_reader_writer_test.cc
b/src/iceberg/test/manifest_reader_writer_test.cc
index f4e770f..d625d5c 100644
--- a/src/iceberg/test/manifest_reader_writer_test.cc
+++ b/src/iceberg/test/manifest_reader_writer_test.cc
@@ -260,8 +260,9 @@ class ManifestReaderV2Test : public ManifestReaderTestBase {
void TestWriteManifest(int64_t snapshot_id, const std::string&
manifest_list_path,
std::shared_ptr<PartitionSpec> partition_spec,
const std::vector<ManifestEntry>& manifest_entries) {
- auto result = ManifestWriter::MakeV2Writer(snapshot_id,
manifest_list_path, file_io_,
- std::move(partition_spec));
+ auto result =
+ ManifestWriter::MakeV2Writer(snapshot_id, manifest_list_path, file_io_,
+ std::move(partition_spec),
ManifestContent::kData);
ASSERT_TRUE(result.has_value()) << result.error().message;
auto writer = std::move(result.value());
auto status = writer->AddAll(manifest_entries);
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 4367448..62608a9 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -114,6 +114,7 @@ class NameMapping;
enum class SnapshotRefType;
enum class TransformType;
+enum class ManifestContent;
class Decimal;
class Uuid;
diff --git a/src/iceberg/v1_metadata.cc b/src/iceberg/v1_metadata.cc
index 52c52cd..917d30a 100644
--- a/src/iceberg/v1_metadata.cc
+++ b/src/iceberg/v1_metadata.cc
@@ -23,91 +23,98 @@
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
#include "iceberg/util/macros.h"
namespace iceberg {
-Status ManifestEntryAdapterV1::Init() {
- static std::unordered_set<int32_t> kManifestEntryFieldIds{
- ManifestEntry::kStatus.field_id(),
- ManifestEntry::kSnapshotId.field_id(),
- ManifestEntry::kDataFileFieldId,
- DataFile::kFilePath.field_id(),
- DataFile::kFileFormat.field_id(),
- DataFile::kPartitionFieldId,
- DataFile::kRecordCount.field_id(),
- DataFile::kFileSize.field_id(),
- 105, // kBlockSizeInBytes field id
- DataFile::kColumnSizes.field_id(),
- DataFile::kValueCounts.field_id(),
- DataFile::kNullValueCounts.field_id(),
- DataFile::kNanValueCounts.field_id(),
- DataFile::kLowerBounds.field_id(),
- DataFile::kUpperBounds.field_id(),
- DataFile::kKeyMetadata.field_id(),
- DataFile::kSplitOffsets.field_id(),
- DataFile::kSortOrderId.field_id(),
- };
- ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
- ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
- if (partition_spec_ != nullptr) {
- ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"],
ToJsonString(*partition_spec_));
- metadata_["partition-spec-id"] =
std::to_string(partition_spec_->spec_id());
- }
- metadata_["format-version"] = "1";
- return {};
+ManifestEntryAdapterV1::ManifestEntryAdapterV1(
+ std::optional<int64_t> snapshot_id, std::shared_ptr<PartitionSpec>
partition_spec)
+ : ManifestEntryAdapter(std::move(partition_spec), ManifestContent::kData),
+ snapshot_id_(snapshot_id) {}
+
+std::shared_ptr<Schema> ManifestEntryAdapterV1::EntrySchema(
+ std::shared_ptr<StructType> partition_type) {
+ return WrapFileSchema(DataFileSchema(std::move(partition_type)));
}
-Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) {
- return AppendInternal(entry);
+std::shared_ptr<Schema> ManifestEntryAdapterV1::WrapFileSchema(
+ std::shared_ptr<StructType> file_schema) {
+ return std::make_shared<Schema>(std::vector<SchemaField>{
+ ManifestEntry::kStatus,
+ ManifestEntry::kSnapshotId,
+ SchemaField::MakeRequired(ManifestEntry::kDataFileFieldId,
+ ManifestEntry::kDataFileField,
std::move(file_schema)),
+ });
+}
+
+std::shared_ptr<StructType> ManifestEntryAdapterV1::DataFileSchema(
+ std::shared_ptr<StructType> partition_type) {
+ return std::make_shared<StructType>(std::vector<SchemaField>{
+ DataFile::kFilePath,
+ DataFile::kFileFormat,
+ SchemaField::MakeRequired(DataFile::kPartitionFieldId,
DataFile::kPartitionField,
+ std::move(partition_type)),
+ DataFile::kRecordCount,
+ DataFile::kFileSize,
+ SchemaField::MakeRequired(105, "block_size_in_bytes", int64(),
+ "Block size in bytes"),
+ DataFile::kColumnSizes,
+ DataFile::kValueCounts,
+ DataFile::kNullValueCounts,
+ DataFile::kNanValueCounts,
+ DataFile::kLowerBounds,
+ DataFile::kUpperBounds,
+ DataFile::kKeyMetadata,
+ DataFile::kSplitOffsets,
+ DataFile::kSortOrderId,
+ });
}
-Result<std::shared_ptr<StructType>>
ManifestEntryAdapterV1::GetManifestEntryType() {
- // 'block_size_in_bytes' (ID 105) is a deprecated field that is REQUIRED
- // in the v1 data_file schema for backward compatibility.
- // Deprecated. Always write a default in v1. Do not write in v2 or v3.
- static const SchemaField kBlockSizeInBytes = SchemaField::MakeRequired(
- 105, "block_size_in_bytes", int64(), "Block size in bytes");
+Status ManifestEntryAdapterV1::Init() {
+ // TODO(gangwu): fix the schema to use current table schema.
+ // ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"],
ToJsonString(*manifest_schema_))
+ ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"],
ToJsonString(*partition_spec_));
+ metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
+ metadata_["format-version"] = "1";
+
ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
partition_spec_->PartitionType());
if (!partition_type) {
- partition_type = PartitionSpec::Unpartitioned()->schema();
+ partition_type = std::make_shared<StructType>(std::vector<SchemaField>{});
}
- auto datafile_type = std::make_shared<StructType>(std::vector<SchemaField>{
- DataFile::kFilePath, DataFile::kFileFormat,
- SchemaField::MakeRequired(102, DataFile::kPartitionField,
- std::move(partition_type)),
- DataFile::kRecordCount, DataFile::kFileSize, kBlockSizeInBytes,
- DataFile::kColumnSizes, DataFile::kValueCounts,
DataFile::kNullValueCounts,
- DataFile::kNanValueCounts, DataFile::kLowerBounds,
DataFile::kUpperBounds,
- DataFile::kKeyMetadata, DataFile::kSplitOffsets,
DataFile::kSortOrderId});
+ manifest_schema_ = EntrySchema(std::move(partition_type));
+ return ToArrowSchema(*manifest_schema_, &schema_);
+}
- return std::make_shared<StructType>(
- std::vector<SchemaField>{ManifestEntry::kStatus,
ManifestEntry::kSnapshotId,
- SchemaField::MakeRequired(2,
ManifestEntry::kDataFileField,
-
std::move(datafile_type))});
+Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) {
+ return AppendInternal(entry);
}
+const std::shared_ptr<Schema> ManifestFileAdapterV1::kManifestListSchema =
+ std::make_shared<Schema>(std::vector<SchemaField>{
+ ManifestFile::kManifestPath,
+ ManifestFile::kManifestLength,
+ ManifestFile::kPartitionSpecId,
+ ManifestFile::kAddedSnapshotId,
+ ManifestFile::kAddedFilesCount,
+ ManifestFile::kExistingFilesCount,
+ ManifestFile::kDeletedFilesCount,
+ ManifestFile::kPartitions,
+ ManifestFile::kAddedRowsCount,
+ ManifestFile::kExistingRowsCount,
+ ManifestFile::kDeletedRowsCount,
+ ManifestFile::kKeyMetadata,
+ });
+
Status ManifestFileAdapterV1::Init() {
- static std::unordered_set<int32_t> kManifestFileFieldIds{
- ManifestFile::kManifestPath.field_id(),
- ManifestFile::kManifestLength.field_id(),
- ManifestFile::kPartitionSpecId.field_id(),
- ManifestFile::kAddedSnapshotId.field_id(),
- ManifestFile::kAddedFilesCount.field_id(),
- ManifestFile::kExistingFilesCount.field_id(),
- ManifestFile::kDeletedFilesCount.field_id(),
- ManifestFile::kAddedRowsCount.field_id(),
- ManifestFile::kExistingRowsCount.field_id(),
- ManifestFile::kDeletedRowsCount.field_id(),
- ManifestFile::kPartitions.field_id(),
- ManifestFile::kKeyMetadata.field_id(),
- };
metadata_["snapshot-id"] = std::to_string(snapshot_id_);
metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value()
?
std::to_string(parent_snapshot_id_.value())
: "null";
metadata_["format-version"] = "1";
- return InitSchema(kManifestFileFieldIds);
+
+ manifest_list_schema_ = kManifestListSchema;
+ return ToArrowSchema(*manifest_list_schema_, &schema_);
}
Status ManifestFileAdapterV1::Append(const ManifestFile& file) {
diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h
index 3c095a9..d971901 100644
--- a/src/iceberg/v1_metadata.h
+++ b/src/iceberg/v1_metadata.h
@@ -28,13 +28,14 @@ namespace iceberg {
class ManifestEntryAdapterV1 : public ManifestEntryAdapter {
public:
ManifestEntryAdapterV1(std::optional<int64_t> snapshot_id,
- std::shared_ptr<PartitionSpec> partition_spec)
- : ManifestEntryAdapter(std::move(partition_spec)),
snapshot_id_(snapshot_id) {}
+ std::shared_ptr<PartitionSpec> partition_spec);
Status Init() override;
Status Append(const ManifestEntry& entry) override;
- protected:
- Result<std::shared_ptr<StructType>> GetManifestEntryType() override;
+ static std::shared_ptr<Schema> EntrySchema(std::shared_ptr<StructType>
partition_type);
+ static std::shared_ptr<Schema> WrapFileSchema(std::shared_ptr<StructType>
file_schema);
+ static std::shared_ptr<StructType> DataFileSchema(
+ std::shared_ptr<StructType> partition_type);
private:
std::optional<int64_t> snapshot_id_;
@@ -48,6 +49,8 @@ class ManifestFileAdapterV1 : public ManifestFileAdapter {
Status Init() override;
Status Append(const ManifestFile& file) override;
+ static const std::shared_ptr<Schema> kManifestListSchema;
+
private:
int64_t snapshot_id_;
std::optional<int64_t> parent_snapshot_id_;
diff --git a/src/iceberg/v2_metadata.cc b/src/iceberg/v2_metadata.cc
index c1407b1..2545c30 100644
--- a/src/iceberg/v2_metadata.cc
+++ b/src/iceberg/v2_metadata.cc
@@ -23,44 +23,69 @@
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
#include "iceberg/util/macros.h"
namespace iceberg {
+ManifestEntryAdapterV2::ManifestEntryAdapterV2(
+ std::optional<int64_t> snapshot_id, std::shared_ptr<PartitionSpec>
partition_spec,
+ ManifestContent content)
+ : ManifestEntryAdapter(std::move(partition_spec), content),
+ snapshot_id_(snapshot_id) {}
+
+std::shared_ptr<Schema> ManifestEntryAdapterV2::EntrySchema(
+ std::shared_ptr<StructType> partition_type) {
+ return WrapFileSchema(DataFileType(std::move(partition_type)));
+}
+std::shared_ptr<Schema> ManifestEntryAdapterV2::WrapFileSchema(
+ std::shared_ptr<StructType> file_schema) {
+ return std::make_shared<Schema>(std::vector<SchemaField>{
+ ManifestEntry::kStatus,
+ ManifestEntry::kSnapshotId,
+ ManifestEntry::kSequenceNumber,
+ ManifestEntry::kFileSequenceNumber,
+ SchemaField::MakeRequired(ManifestEntry::kDataFileFieldId,
+ ManifestEntry::kDataFileField,
std::move(file_schema)),
+ });
+}
+std::shared_ptr<StructType> ManifestEntryAdapterV2::DataFileType(
+ std::shared_ptr<StructType> partition_type) {
+ return std::make_shared<StructType>(std::vector<SchemaField>{
+ DataFile::kContent.AsRequired(),
+ DataFile::kFilePath,
+ DataFile::kFileFormat,
+ SchemaField::MakeRequired(DataFile::kPartitionFieldId,
DataFile::kPartitionField,
+ std::move(partition_type),
DataFile::kPartitionDoc),
+ DataFile::kRecordCount,
+ DataFile::kFileSize,
+ DataFile::kColumnSizes,
+ DataFile::kValueCounts,
+ DataFile::kNullValueCounts,
+ DataFile::kNanValueCounts,
+ DataFile::kLowerBounds,
+ DataFile::kUpperBounds,
+ DataFile::kKeyMetadata,
+ DataFile::kSplitOffsets,
+ DataFile::kEqualityIds,
+ DataFile::kSortOrderId,
+ DataFile::kReferencedDataFile,
+ });
+}
+
Status ManifestEntryAdapterV2::Init() {
- static std::unordered_set<int32_t> kManifestEntryFieldIds{
- ManifestEntry::kStatus.field_id(),
- ManifestEntry::kSnapshotId.field_id(),
- ManifestEntry::kSequenceNumber.field_id(),
- ManifestEntry::kFileSequenceNumber.field_id(),
- ManifestEntry::kDataFileFieldId,
- DataFile::kContent.field_id(),
- DataFile::kFilePath.field_id(),
- DataFile::kFileFormat.field_id(),
- DataFile::kPartitionFieldId,
- DataFile::kRecordCount.field_id(),
- DataFile::kFileSize.field_id(),
- DataFile::kColumnSizes.field_id(),
- DataFile::kValueCounts.field_id(),
- DataFile::kNullValueCounts.field_id(),
- DataFile::kNanValueCounts.field_id(),
- DataFile::kLowerBounds.field_id(),
- DataFile::kUpperBounds.field_id(),
- DataFile::kKeyMetadata.field_id(),
- DataFile::kSplitOffsets.field_id(),
- DataFile::kEqualityIds.field_id(),
- DataFile::kSortOrderId.field_id(),
- DataFile::kReferencedDataFile.field_id(),
- };
- ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
- ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
- if (partition_spec_ != nullptr) {
- ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"],
ToJsonString(*partition_spec_));
- metadata_["partition-spec-id"] =
std::to_string(partition_spec_->spec_id());
- }
+ // ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"],
ToJsonString(*manifest_schema_))
+ ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"],
ToJsonString(*partition_spec_));
+ metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
metadata_["format-version"] = "2";
- metadata_["content"] = "data";
- return {};
+ metadata_["content"] = content_ == ManifestContent::kData ? "data" :
"delete";
+
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
partition_spec_->PartitionType());
+ if (!partition_type) {
+ partition_type = std::make_shared<StructType>(std::vector<SchemaField>{});
+ }
+ manifest_schema_ = EntrySchema(std::move(partition_type));
+ return ToArrowSchema(*manifest_schema_, &schema_);
}
Status ManifestEntryAdapterV2::Append(const ManifestEntry& entry) {
@@ -99,31 +124,35 @@ Result<std::optional<std::string>>
ManifestEntryAdapterV2::GetReferenceDataFile(
return std::nullopt;
}
+const std::shared_ptr<Schema> ManifestFileAdapterV2::kManifestListSchema =
+ std::make_shared<Schema>(std::vector<SchemaField>{
+ ManifestFile::kManifestPath,
+ ManifestFile::kManifestLength,
+ ManifestFile::kPartitionSpecId,
+ ManifestFile::kContent.AsRequired(),
+ ManifestFile::kSequenceNumber.AsRequired(),
+ ManifestFile::kMinSequenceNumber.AsRequired(),
+ ManifestFile::kAddedSnapshotId,
+ ManifestFile::kAddedFilesCount.AsRequired(),
+ ManifestFile::kExistingFilesCount.AsRequired(),
+ ManifestFile::kDeletedFilesCount.AsRequired(),
+ ManifestFile::kAddedRowsCount.AsRequired(),
+ ManifestFile::kExistingRowsCount.AsRequired(),
+ ManifestFile::kDeletedRowsCount.AsRequired(),
+ ManifestFile::kPartitions,
+ ManifestFile::kKeyMetadata,
+ });
+
Status ManifestFileAdapterV2::Init() {
- static std::unordered_set<int32_t> kManifestFileFieldIds{
- ManifestFile::kManifestPath.field_id(),
- ManifestFile::kManifestLength.field_id(),
- ManifestFile::kPartitionSpecId.field_id(),
- ManifestFile::kContent.field_id(),
- ManifestFile::kSequenceNumber.field_id(),
- ManifestFile::kMinSequenceNumber.field_id(),
- ManifestFile::kAddedSnapshotId.field_id(),
- ManifestFile::kAddedFilesCount.field_id(),
- ManifestFile::kExistingFilesCount.field_id(),
- ManifestFile::kDeletedFilesCount.field_id(),
- ManifestFile::kAddedRowsCount.field_id(),
- ManifestFile::kExistingRowsCount.field_id(),
- ManifestFile::kDeletedRowsCount.field_id(),
- ManifestFile::kPartitions.field_id(),
- ManifestFile::kKeyMetadata.field_id(),
- };
metadata_["snapshot-id"] = std::to_string(snapshot_id_);
metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value()
?
std::to_string(parent_snapshot_id_.value())
: "null";
metadata_["sequence-number"] = std::to_string(sequence_number_);
metadata_["format-version"] = "2";
- return InitSchema(kManifestFileFieldIds);
+
+ manifest_list_schema_ = kManifestListSchema;
+ return ToArrowSchema(*manifest_list_schema_, &schema_);
}
Status ManifestFileAdapterV2::Append(const ManifestFile& file) {
@@ -132,9 +161,12 @@ Status ManifestFileAdapterV2::Append(const ManifestFile&
file) {
Result<int64_t> ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile&
file) const {
if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ // if the sequence number is being assigned here, then the manifest must
be created by
+ // the current operation. to validate this, check that the snapshot id
matches the
+ // current commit
if (snapshot_id_ != file.added_snapshot_id) {
return InvalidManifestList(
- "Found unassigned sequence number for a manifest from snapshot: %s",
+ "Found unassigned sequence number for a manifest from snapshot: {}",
file.added_snapshot_id);
}
return sequence_number_;
@@ -145,11 +177,15 @@ Result<int64_t>
ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile& fil
Result<int64_t> ManifestFileAdapterV2::GetMinSequenceNumber(
const ManifestFile& file) const {
if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ // same sanity check as above
if (snapshot_id_ != file.added_snapshot_id) {
return InvalidManifestList(
- "Found unassigned sequence number for a manifest from snapshot: %s",
+ "Found unassigned sequence number for a manifest from snapshot: {}",
file.added_snapshot_id);
}
+ // if the min sequence number is not determined, then there was no
assigned sequence
+ // number for any file written to the wrapped manifest. replace the
unassigned
+ // sequence number with the one for this commit
return sequence_number_;
}
return file.min_sequence_number;
diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h
index 164a497..1b2b43a 100644
--- a/src/iceberg/v2_metadata.h
+++ b/src/iceberg/v2_metadata.h
@@ -29,11 +29,16 @@ namespace iceberg {
class ManifestEntryAdapterV2 : public ManifestEntryAdapter {
public:
ManifestEntryAdapterV2(std::optional<int64_t> snapshot_id,
- std::shared_ptr<PartitionSpec> partition_spec)
- : ManifestEntryAdapter(std::move(partition_spec)),
snapshot_id_(snapshot_id) {}
+ std::shared_ptr<PartitionSpec> partition_spec,
+ ManifestContent content);
Status Init() override;
Status Append(const ManifestEntry& entry) override;
+ static std::shared_ptr<Schema> EntrySchema(std::shared_ptr<StructType>
partition_type);
+ static std::shared_ptr<Schema> WrapFileSchema(std::shared_ptr<StructType>
file_schema);
+ static std::shared_ptr<StructType> DataFileType(
+ std::shared_ptr<StructType> partition_type);
+
protected:
Result<std::optional<int64_t>> GetSequenceNumber(
const ManifestEntry& entry) const override;
@@ -55,6 +60,8 @@ class ManifestFileAdapterV2 : public ManifestFileAdapter {
Status Init() override;
Status Append(const ManifestFile& file) override;
+ static const std::shared_ptr<Schema> kManifestListSchema;
+
protected:
Result<int64_t> GetSequenceNumber(const ManifestFile& file) const override;
Result<int64_t> GetMinSequenceNumber(const ManifestFile& file) const
override;
diff --git a/src/iceberg/v3_metadata.cc b/src/iceberg/v3_metadata.cc
index 61474f6..3b03303 100644
--- a/src/iceberg/v3_metadata.cc
+++ b/src/iceberg/v3_metadata.cc
@@ -23,47 +23,75 @@
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
#include "iceberg/util/macros.h"
namespace iceberg {
+ManifestEntryAdapterV3::ManifestEntryAdapterV3(
+ std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
+ std::shared_ptr<PartitionSpec> partition_spec, ManifestContent content)
+ : ManifestEntryAdapter(std::move(partition_spec), content),
+ snapshot_id_(snapshot_id),
+ first_row_id_(first_row_id) {}
+
+std::shared_ptr<Schema> ManifestEntryAdapterV3::EntrySchema(
+ std::shared_ptr<StructType> partition_type) {
+ return WrapFileSchema(DataFileType(std::move(partition_type)));
+}
+
+std::shared_ptr<Schema> ManifestEntryAdapterV3::WrapFileSchema(
+ std::shared_ptr<StructType> file_schema) {
+ return std::make_shared<Schema>(std::vector<SchemaField>{
+ ManifestEntry::kStatus,
+ ManifestEntry::kSnapshotId,
+ ManifestEntry::kSequenceNumber,
+ ManifestEntry::kFileSequenceNumber,
+ SchemaField::MakeRequired(ManifestEntry::kDataFileFieldId,
+ ManifestEntry::kDataFileField,
std::move(file_schema)),
+ });
+}
+
+std::shared_ptr<StructType> ManifestEntryAdapterV3::DataFileType(
+ std::shared_ptr<StructType> partition_type) {
+ return std::make_shared<StructType>(std::vector<SchemaField>{
+ DataFile::kContent.AsRequired(),
+ DataFile::kFilePath,
+ DataFile::kFileFormat,
+ SchemaField::MakeRequired(DataFile::kPartitionFieldId,
DataFile::kPartitionField,
+ std::move(partition_type),
DataFile::kPartitionDoc),
+ DataFile::kRecordCount,
+ DataFile::kFileSize,
+ DataFile::kColumnSizes,
+ DataFile::kValueCounts,
+ DataFile::kNullValueCounts,
+ DataFile::kNanValueCounts,
+ DataFile::kLowerBounds,
+ DataFile::kUpperBounds,
+ DataFile::kKeyMetadata,
+ DataFile::kSplitOffsets,
+ DataFile::kEqualityIds,
+ DataFile::kSortOrderId,
+ DataFile::kFirstRowId,
+ DataFile::kReferencedDataFile,
+ DataFile::kContentOffset,
+ DataFile::kContentSize,
+ });
+}
+
Status ManifestEntryAdapterV3::Init() {
- static std::unordered_set<int32_t> kManifestEntryFieldIds{
- ManifestEntry::kStatus.field_id(),
- ManifestEntry::kSnapshotId.field_id(),
- ManifestEntry::kDataFileFieldId,
- ManifestEntry::kSequenceNumber.field_id(),
- ManifestEntry::kFileSequenceNumber.field_id(),
- DataFile::kContent.field_id(),
- DataFile::kFilePath.field_id(),
- DataFile::kFileFormat.field_id(),
- DataFile::kPartitionFieldId,
- DataFile::kRecordCount.field_id(),
- DataFile::kFileSize.field_id(),
- DataFile::kColumnSizes.field_id(),
- DataFile::kValueCounts.field_id(),
- DataFile::kNullValueCounts.field_id(),
- DataFile::kNanValueCounts.field_id(),
- DataFile::kLowerBounds.field_id(),
- DataFile::kUpperBounds.field_id(),
- DataFile::kKeyMetadata.field_id(),
- DataFile::kSplitOffsets.field_id(),
- DataFile::kEqualityIds.field_id(),
- DataFile::kSortOrderId.field_id(),
- DataFile::kFirstRowId.field_id(),
- DataFile::kReferencedDataFile.field_id(),
- DataFile::kContentOffset.field_id(),
- DataFile::kContentSize.field_id(),
- };
- ICEBERG_RETURN_UNEXPECTED(InitSchema(kManifestEntryFieldIds));
- ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"], ToJsonString(*manifest_schema_))
- if (partition_spec_ != nullptr) {
- ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"],
ToJsonString(*partition_spec_));
- metadata_["partition-spec-id"] =
std::to_string(partition_spec_->spec_id());
- }
+ // ICEBERG_ASSIGN_OR_RAISE(metadata_["schema"],
ToJsonString(*manifest_schema_))
+ ICEBERG_ASSIGN_OR_RAISE(metadata_["partition-spec"],
ToJsonString(*partition_spec_));
+ metadata_["partition-spec-id"] = std::to_string(partition_spec_->spec_id());
metadata_["format-version"] = "3";
- metadata_["content"] = "data";
- return {};
+ metadata_["content"] = content_ == ManifestContent::kData ? "data" :
"delete";
+
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
partition_spec_->PartitionType());
+ if (!partition_type) {
+ partition_type = std::make_shared<StructType>(std::vector<SchemaField>{});
+ }
+ manifest_schema_ = EntrySchema(std::move(partition_type));
+ return ToArrowSchema(*manifest_schema_, &schema_);
}
Status ManifestEntryAdapterV3::Append(const ManifestEntry& entry) {
@@ -126,25 +154,26 @@ Result<std::optional<int64_t>>
ManifestEntryAdapterV3::GetContentSizeInBytes(
return std::nullopt;
}
+const std::shared_ptr<Schema> ManifestFileAdapterV3::kManifestListSchema =
+ std::make_shared<Schema>(std::vector<SchemaField>{
+ ManifestFile::kManifestPath,
+ ManifestFile::kManifestLength,
+ ManifestFile::kPartitionSpecId,
+ ManifestFile::kContent.AsRequired(),
+ ManifestFile::kSequenceNumber.AsRequired(),
+ ManifestFile::kMinSequenceNumber.AsRequired(),
+ ManifestFile::kAddedSnapshotId,
+ ManifestFile::kAddedFilesCount.AsRequired(),
+ ManifestFile::kExistingFilesCount.AsRequired(),
+ ManifestFile::kDeletedFilesCount.AsRequired(),
+ ManifestFile::kAddedRowsCount.AsRequired(),
+ ManifestFile::kExistingRowsCount.AsRequired(),
+ ManifestFile::kDeletedRowsCount.AsRequired(),
+ ManifestFile::kPartitions,
+ ManifestFile::kKeyMetadata,
+ ManifestFile::kFirstRowId,
+ });
Status ManifestFileAdapterV3::Init() {
- static std::unordered_set<int32_t> kManifestFileFieldIds{
- ManifestFile::kManifestPath.field_id(),
- ManifestFile::kManifestLength.field_id(),
- ManifestFile::kPartitionSpecId.field_id(),
- ManifestFile::kContent.field_id(),
- ManifestFile::kSequenceNumber.field_id(),
- ManifestFile::kMinSequenceNumber.field_id(),
- ManifestFile::kAddedSnapshotId.field_id(),
- ManifestFile::kAddedFilesCount.field_id(),
- ManifestFile::kExistingFilesCount.field_id(),
- ManifestFile::kDeletedFilesCount.field_id(),
- ManifestFile::kAddedRowsCount.field_id(),
- ManifestFile::kExistingRowsCount.field_id(),
- ManifestFile::kDeletedRowsCount.field_id(),
- ManifestFile::kPartitions.field_id(),
- ManifestFile::kKeyMetadata.field_id(),
- ManifestFile::kFirstRowId.field_id(),
- };
metadata_["snapshot-id"] = std::to_string(snapshot_id_);
metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value()
?
std::to_string(parent_snapshot_id_.value())
@@ -153,24 +182,28 @@ Status ManifestFileAdapterV3::Init() {
metadata_["first-row-id"] =
next_row_id_.has_value() ? std::to_string(next_row_id_.value()) : "null";
metadata_["format-version"] = "3";
- return InitSchema(kManifestFileFieldIds);
+
+ manifest_list_schema_ = kManifestListSchema;
+ return ToArrowSchema(*manifest_list_schema_, &schema_);
}
Status ManifestFileAdapterV3::Append(const ManifestFile& file) {
- auto status = AppendInternal(file);
- ICEBERG_RETURN_UNEXPECTED(status);
- if (WrappedFirstRowId(file) && next_row_id_.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendInternal(file));
+ if (WrapFirstRowId(file) && next_row_id_.has_value()) {
next_row_id_ = next_row_id_.value() + file.existing_rows_count.value_or(0)
+
file.added_rows_count.value_or(0);
}
- return status;
+ return {};
}
Result<int64_t> ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile&
file) const {
if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ // if the sequence number is being assigned here, then the manifest must
be created by
+ // the current operation. to validate this, check that the snapshot id
matches the
+ // current commit
if (snapshot_id_ != file.added_snapshot_id) {
return InvalidManifestList(
- "Found unassigned sequence number for a manifest from snapshot: %s",
+ "Found unassigned sequence number for a manifest from snapshot: {}",
file.added_snapshot_id);
}
return sequence_number_;
@@ -181,11 +214,15 @@ Result<int64_t>
ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile& fil
Result<int64_t> ManifestFileAdapterV3::GetMinSequenceNumber(
const ManifestFile& file) const {
if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ // same sanity check as above
if (snapshot_id_ != file.added_snapshot_id) {
return InvalidManifestList(
- "Found unassigned sequence number for a manifest from snapshot: %s",
+ "Found unassigned sequence number for a manifest from snapshot: {}",
file.added_snapshot_id);
}
+ // if the min sequence number is not determined, then there was no
assigned sequence
+ // number for any file written to the wrapped manifest. replace the
unassigned
+ // sequence number with the one for this commit
return sequence_number_;
}
return file.min_sequence_number;
@@ -193,20 +230,26 @@ Result<int64_t>
ManifestFileAdapterV3::GetMinSequenceNumber(
Result<std::optional<int64_t>> ManifestFileAdapterV3::GetFirstRowId(
const ManifestFile& file) const {
- if (WrappedFirstRowId(file)) {
+ if (WrapFirstRowId(file)) {
+ // if first-row-id is assigned, ensure that it is valid
+ if (!next_row_id_.has_value()) {
+ // TODO(gangwu): add ToString for ManifestFile
+ return InvalidManifestList("Found invalid first-row-id assignment: {}",
+ file.manifest_path);
+ }
return next_row_id_;
} else if (file.content != ManifestFile::Content::kData) {
return std::nullopt;
} else {
if (!file.first_row_id.has_value()) {
- return InvalidManifestList("Found unassigned first-row-id for file:{}",
+ return InvalidManifestList("Found unassigned first-row-id for file: {}",
file.manifest_path);
}
- return file.first_row_id.value();
+ return file.first_row_id;
}
}
-bool ManifestFileAdapterV3::WrappedFirstRowId(const ManifestFile& file) const {
+bool ManifestFileAdapterV3::WrapFirstRowId(const ManifestFile& file) const {
return file.content == ManifestFile::Content::kData &&
!file.first_row_id.has_value();
}
diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h
index a107610..421b9c0 100644
--- a/src/iceberg/v3_metadata.h
+++ b/src/iceberg/v3_metadata.h
@@ -30,13 +30,16 @@ class ManifestEntryAdapterV3 : public ManifestEntryAdapter {
public:
ManifestEntryAdapterV3(std::optional<int64_t> snapshot_id,
std::optional<int64_t> first_row_id,
- std::shared_ptr<PartitionSpec> partition_spec)
- : ManifestEntryAdapter(std::move(partition_spec)),
- snapshot_id_(snapshot_id),
- first_row_id_(first_row_id) {}
+ std::shared_ptr<PartitionSpec> partition_spec,
+ ManifestContent content);
Status Init() override;
Status Append(const ManifestEntry& entry) override;
+ static std::shared_ptr<Schema> EntrySchema(std::shared_ptr<StructType>
partition_type);
+ static std::shared_ptr<Schema> WrapFileSchema(std::shared_ptr<StructType>
file_schema);
+ static std::shared_ptr<StructType> DataFileType(
+ std::shared_ptr<StructType> partition_type);
+
protected:
Result<std::optional<int64_t>> GetSequenceNumber(
const ManifestEntry& entry) const override;
@@ -56,13 +59,16 @@ class ManifestEntryAdapterV3 : public ManifestEntryAdapter {
class ManifestFileAdapterV3 : public ManifestFileAdapter {
public:
ManifestFileAdapterV3(int64_t snapshot_id, std::optional<int64_t>
parent_snapshot_id,
- int64_t sequence_number, std::optional<int64_t>
first_row_id)
+ int64_t sequence_number, int64_t first_row_id)
: snapshot_id_(snapshot_id),
parent_snapshot_id_(parent_snapshot_id),
sequence_number_(sequence_number),
next_row_id_(first_row_id) {}
Status Init() override;
Status Append(const ManifestFile& file) override;
+ std::optional<int64_t> next_row_id() const override { return next_row_id_; }
+
+ static const std::shared_ptr<Schema> kManifestListSchema;
protected:
Result<int64_t> GetSequenceNumber(const ManifestFile& file) const override;
@@ -70,7 +76,7 @@ class ManifestFileAdapterV3 : public ManifestFileAdapter {
Result<std::optional<int64_t>> GetFirstRowId(const ManifestFile& file) const
override;
private:
- bool WrappedFirstRowId(const ManifestFile& file) const;
+ bool WrapFirstRowId(const ManifestFile& file) const;
private:
int64_t snapshot_id_;