zhjwpku commented on code in PR #293:
URL: https://github.com/apache/iceberg-cpp/pull/293#discussion_r2506299434


##########
src/iceberg/test/manifest_list_versions_test.cc:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <optional>
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/record_batch.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/manifest_reader.h"
+#include "iceberg/manifest_writer.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/v1_metadata.h"
+
+namespace iceberg {
+
+constexpr int kRowLineageFormatVersion = 3;
+constexpr const char* kPath = "s3://bucket/table/m1.avro";
+constexpr int64_t kLength = 1024L;
+constexpr int32_t kSpecId = 1;
+constexpr int64_t kSeqNum = 34L;
+constexpr int64_t kMinSeqNum = 10L;
+constexpr int64_t kSnapshotId = 987134631982734L;
+constexpr int32_t kAddedFiles = 2;
+constexpr int64_t kAddedRows = 5292L;
+constexpr int32_t kExistingFiles = 343;
+constexpr int64_t kExistingRows = 857273L;
+constexpr int32_t kDeletedFiles = 1;
+constexpr int64_t kDeletedRows = 22910L;
+constexpr int64_t kFirstRowId = 100L;
+constexpr int64_t kSnapshotFirstRowId = 130L;
+
+const static auto kTestManifest = ManifestFile{
+    .manifest_path = kPath,
+    .manifest_length = kLength,
+    .partition_spec_id = kSpecId,
+    .content = ManifestFile::Content::kData,
+    .sequence_number = kSeqNum,
+    .min_sequence_number = kMinSeqNum,
+    .added_snapshot_id = kSnapshotId,
+    .added_files_count = kAddedFiles,
+    .existing_files_count = kExistingFiles,
+    .deleted_files_count = kDeletedFiles,
+    .added_rows_count = kAddedRows,
+    .existing_rows_count = kExistingRows,
+    .deleted_rows_count = kDeletedRows,
+    .partitions = {},
+    .key_metadata = {},
+    .first_row_id = kFirstRowId,
+};
+
+const static auto kDeleteManifest = ManifestFile{
+    .manifest_path = kPath,
+    .manifest_length = kLength,
+    .partition_spec_id = kSpecId,
+    .content = ManifestFile::Content::kDeletes,
+    .sequence_number = kSeqNum,
+    .min_sequence_number = kMinSeqNum,
+    .added_snapshot_id = kSnapshotId,
+    .added_files_count = kAddedFiles,
+    .existing_files_count = kExistingFiles,
+    .deleted_files_count = kDeletedFiles,
+    .added_rows_count = kAddedRows,
+    .existing_rows_count = kExistingRows,
+    .deleted_rows_count = kDeletedRows,
+    .first_row_id = std::nullopt,
+};
+
+class TestManifestListVersions : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    avro::RegisterAll();
+    file_io_ = iceberg::arrow::MakeMockFileIO();
+  }
+
+  static std::string CreateManifestListPath() {
+    return std::format("manifest-list-{}.avro",
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  std::string WriteManifestList(int format_version, int64_t 
expected_next_row_id,
+                                const std::vector<ManifestFile>& manifests) 
const {
+    const std::string manifest_list_path = CreateManifestListPath();
+    constexpr int64_t kParentSnapshotId = kSnapshotId - 1;
+
+    Result<std::unique_ptr<ManifestListWriter>> writer_result =
+        NotSupported("Format version: {}", format_version);
+
+    if (format_version == 1) {
+      writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId, 
kParentSnapshotId,
+                                                       manifest_list_path, 
file_io_);
+    } else if (format_version == 2) {
+      writer_result = ManifestListWriter::MakeV2Writer(
+          kSnapshotId, kParentSnapshotId, kSeqNum, manifest_list_path, 
file_io_);
+    } else if (format_version == 3) {
+      writer_result = ManifestListWriter::MakeV3Writer(kSnapshotId, 
kParentSnapshotId,
+                                                       kSeqNum, 
kSnapshotFirstRowId,
+                                                       manifest_list_path, 
file_io_);
+    }
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    EXPECT_THAT(writer->AddAll(manifests), IsOk());
+    EXPECT_THAT(writer->Close(), IsOk());
+
+    if (format_version >= kRowLineageFormatVersion) {
+      EXPECT_EQ(writer->next_row_id(), 
std::make_optional(expected_next_row_id));
+    } else {
+      EXPECT_FALSE(writer->next_row_id().has_value());
+    }
+
+    return manifest_list_path;
+  }
+
+  ManifestFile WriteAndReadManifestList(int format_version) const {
+    return ReadManifestList(
+        WriteManifestList(format_version, kSnapshotFirstRowId, 
{kTestManifest}));
+  }
+
+  ManifestFile ReadManifestList(const std::string& manifest_list_path) const {
+    auto reader_result = ManifestListReader::Make(manifest_list_path, 
file_io_);
+    EXPECT_THAT(reader_result, IsOk());
+
+    auto reader = std::move(reader_result.value());
+    auto files_result = reader->Files();
+    EXPECT_THAT(files_result, IsOk());
+
+    auto manifests = files_result.value();
+    EXPECT_EQ(manifests.size(), 1);
+
+    return manifests[0];
+  }
+
+  std::vector<ManifestFile> ReadAllManifests(
+      const std::string& manifest_list_path) const {
+    auto reader_result = ManifestListReader::Make(manifest_list_path, 
file_io_);
+    EXPECT_THAT(reader_result, IsOk());
+
+    auto reader = std::move(reader_result.value());
+    auto files_result = reader->Files();
+    EXPECT_THAT(files_result, IsOk());
+
+    return files_result.value();
+  }
+
+  void ReadAvro(const std::string& path, const std::shared_ptr<Schema>& schema,
+                const std::string& expected_json) const {
+    auto reader_result = ReaderFactoryRegistry::Open(
+        FileFormatType::kAvro, {.path = path, .io = file_io_, .projection = 
schema});
+    EXPECT_THAT(reader_result, IsOk());
+    auto reader = std::move(reader_result.value());
+
+    auto arrow_schema_result = reader->Schema();
+    EXPECT_THAT(arrow_schema_result, IsOk());
+    auto arrow_c_schema = std::move(arrow_schema_result.value());
+    auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+    auto expected_array =
+        ::arrow::json::ArrayFromJSONString(arrow_schema, 
expected_json).ValueOrDie();
+
+    auto batch_result = reader->Next();
+    EXPECT_THAT(batch_result, IsOk());
+    EXPECT_TRUE(batch_result.value().has_value());
+    auto arrow_c_batch = std::move(batch_result.value().value());
+
+    auto arrow_batch_result =
+        ::arrow::ImportArray(&arrow_c_batch, std::move(arrow_schema));
+    auto array = arrow_batch_result.ValueOrDie();
+    EXPECT_TRUE(array != nullptr);
+    EXPECT_TRUE(expected_array->Equals(*array));
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+};
+
+TEST_F(TestManifestListVersions, TestV1WriteDeleteManifest) {
+  const std::string manifest_list_path = CreateManifestListPath();
+
+  auto writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId, 
kSnapshotId - 1,
+                                                        manifest_list_path, 
file_io_);
+  EXPECT_THAT(writer_result, IsOk());
+
+  auto writer = std::move(writer_result.value());
+  auto status = writer->Add(kDeleteManifest);
+
+  EXPECT_THAT(status, IsError(ErrorKind::kInvalidManifestList));
+  EXPECT_THAT(status, HasErrorMessage("Cannot store delete manifests in a v1 
table"));
+}
+
+TEST_F(TestManifestListVersions, TestV1Write) {
+  auto manifest = WriteAndReadManifestList(1);

Review Comment:
   Nit:
   ```suggestion
     auto manifest = WriteAndReadManifestList(/*format_version=*/1);
   ```



##########
src/iceberg/test/manifest_list_versions_test.cc:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <optional>
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/record_batch.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/manifest_reader.h"
+#include "iceberg/manifest_writer.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/v1_metadata.h"
+
+namespace iceberg {
+
+constexpr int kRowLineageFormatVersion = 3;
+constexpr const char* kPath = "s3://bucket/table/m1.avro";
+constexpr int64_t kLength = 1024L;
+constexpr int32_t kSpecId = 1;
+constexpr int64_t kSeqNum = 34L;
+constexpr int64_t kMinSeqNum = 10L;
+constexpr int64_t kSnapshotId = 987134631982734L;
+constexpr int32_t kAddedFiles = 2;
+constexpr int64_t kAddedRows = 5292L;
+constexpr int32_t kExistingFiles = 343;
+constexpr int64_t kExistingRows = 857273L;
+constexpr int32_t kDeletedFiles = 1;
+constexpr int64_t kDeletedRows = 22910L;
+constexpr int64_t kFirstRowId = 100L;
+constexpr int64_t kSnapshotFirstRowId = 130L;
+
+const static auto kTestManifest = ManifestFile{
+    .manifest_path = kPath,
+    .manifest_length = kLength,
+    .partition_spec_id = kSpecId,
+    .content = ManifestFile::Content::kData,
+    .sequence_number = kSeqNum,
+    .min_sequence_number = kMinSeqNum,
+    .added_snapshot_id = kSnapshotId,
+    .added_files_count = kAddedFiles,
+    .existing_files_count = kExistingFiles,
+    .deleted_files_count = kDeletedFiles,
+    .added_rows_count = kAddedRows,
+    .existing_rows_count = kExistingRows,
+    .deleted_rows_count = kDeletedRows,
+    .partitions = {},
+    .key_metadata = {},
+    .first_row_id = kFirstRowId,
+};
+
+const static auto kDeleteManifest = ManifestFile{
+    .manifest_path = kPath,
+    .manifest_length = kLength,
+    .partition_spec_id = kSpecId,
+    .content = ManifestFile::Content::kDeletes,
+    .sequence_number = kSeqNum,
+    .min_sequence_number = kMinSeqNum,
+    .added_snapshot_id = kSnapshotId,
+    .added_files_count = kAddedFiles,
+    .existing_files_count = kExistingFiles,
+    .deleted_files_count = kDeletedFiles,
+    .added_rows_count = kAddedRows,
+    .existing_rows_count = kExistingRows,
+    .deleted_rows_count = kDeletedRows,
+    .first_row_id = std::nullopt,
+};
+
+class TestManifestListVersions : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    avro::RegisterAll();
+    file_io_ = iceberg::arrow::MakeMockFileIO();
+  }
+
+  static std::string CreateManifestListPath() {
+    return std::format("manifest-list-{}.avro",
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  std::string WriteManifestList(int format_version, int64_t 
expected_next_row_id,
+                                const std::vector<ManifestFile>& manifests) 
const {
+    const std::string manifest_list_path = CreateManifestListPath();
+    constexpr int64_t kParentSnapshotId = kSnapshotId - 1;
+
+    Result<std::unique_ptr<ManifestListWriter>> writer_result =
+        NotSupported("Format version: {}", format_version);
+
+    if (format_version == 1) {
+      writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId, 
kParentSnapshotId,
+                                                       manifest_list_path, 
file_io_);
+    } else if (format_version == 2) {
+      writer_result = ManifestListWriter::MakeV2Writer(
+          kSnapshotId, kParentSnapshotId, kSeqNum, manifest_list_path, 
file_io_);
+    } else if (format_version == 3) {
+      writer_result = ManifestListWriter::MakeV3Writer(kSnapshotId, 
kParentSnapshotId,
+                                                       kSeqNum, 
kSnapshotFirstRowId,
+                                                       manifest_list_path, 
file_io_);
+    }
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    EXPECT_THAT(writer->AddAll(manifests), IsOk());
+    EXPECT_THAT(writer->Close(), IsOk());
+
+    if (format_version >= kRowLineageFormatVersion) {
+      EXPECT_EQ(writer->next_row_id(), 
std::make_optional(expected_next_row_id));
+    } else {
+      EXPECT_FALSE(writer->next_row_id().has_value());
+    }
+
+    return manifest_list_path;
+  }
+
+  ManifestFile WriteAndReadManifestList(int format_version) const {
+    return ReadManifestList(
+        WriteManifestList(format_version, kSnapshotFirstRowId, 
{kTestManifest}));
+  }
+
+  ManifestFile ReadManifestList(const std::string& manifest_list_path) const {
+    auto reader_result = ManifestListReader::Make(manifest_list_path, 
file_io_);
+    EXPECT_THAT(reader_result, IsOk());
+
+    auto reader = std::move(reader_result.value());
+    auto files_result = reader->Files();
+    EXPECT_THAT(files_result, IsOk());
+
+    auto manifests = files_result.value();
+    EXPECT_EQ(manifests.size(), 1);
+
+    return manifests[0];
+  }
+
+  std::vector<ManifestFile> ReadAllManifests(
+      const std::string& manifest_list_path) const {
+    auto reader_result = ManifestListReader::Make(manifest_list_path, 
file_io_);
+    EXPECT_THAT(reader_result, IsOk());
+
+    auto reader = std::move(reader_result.value());
+    auto files_result = reader->Files();
+    EXPECT_THAT(files_result, IsOk());
+
+    return files_result.value();
+  }
+
+  void ReadAvro(const std::string& path, const std::shared_ptr<Schema>& schema,
+                const std::string& expected_json) const {
+    auto reader_result = ReaderFactoryRegistry::Open(
+        FileFormatType::kAvro, {.path = path, .io = file_io_, .projection = 
schema});
+    EXPECT_THAT(reader_result, IsOk());
+    auto reader = std::move(reader_result.value());
+
+    auto arrow_schema_result = reader->Schema();
+    EXPECT_THAT(arrow_schema_result, IsOk());
+    auto arrow_c_schema = std::move(arrow_schema_result.value());
+    auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+    auto expected_array =
+        ::arrow::json::ArrayFromJSONString(arrow_schema, 
expected_json).ValueOrDie();
+
+    auto batch_result = reader->Next();
+    EXPECT_THAT(batch_result, IsOk());
+    EXPECT_TRUE(batch_result.value().has_value());
+    auto arrow_c_batch = std::move(batch_result.value().value());
+
+    auto arrow_batch_result =
+        ::arrow::ImportArray(&arrow_c_batch, std::move(arrow_schema));
+    auto array = arrow_batch_result.ValueOrDie();
+    EXPECT_TRUE(array != nullptr);
+    EXPECT_TRUE(expected_array->Equals(*array));
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+};
+
+TEST_F(TestManifestListVersions, TestV1WriteDeleteManifest) {
+  const std::string manifest_list_path = CreateManifestListPath();
+
+  auto writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId, 
kSnapshotId - 1,
+                                                        manifest_list_path, 
file_io_);
+  EXPECT_THAT(writer_result, IsOk());
+
+  auto writer = std::move(writer_result.value());
+  auto status = writer->Add(kDeleteManifest);
+
+  EXPECT_THAT(status, IsError(ErrorKind::kInvalidManifestList));
+  EXPECT_THAT(status, HasErrorMessage("Cannot store delete manifests in a v1 
table"));
+}
+
+TEST_F(TestManifestListVersions, TestV1Write) {
+  auto manifest = WriteAndReadManifestList(1);
+
+  // V3 fields are not written and are defaulted
+  EXPECT_FALSE(manifest.first_row_id.has_value());
+
+  // V2 fields are not written and are defaulted
+  EXPECT_EQ(manifest.sequence_number, 0);
+  EXPECT_EQ(manifest.min_sequence_number, 0);
+
+  // V1 fields are read correctly
+  EXPECT_EQ(manifest.manifest_path, kPath);
+  EXPECT_EQ(manifest.manifest_length, kLength);
+  EXPECT_EQ(manifest.partition_spec_id, kSpecId);
+  EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+  EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
+  EXPECT_EQ(manifest.added_files_count, kAddedFiles);
+  EXPECT_EQ(manifest.existing_files_count, kExistingFiles);
+  EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles);
+  EXPECT_EQ(manifest.added_rows_count, kAddedRows);
+  EXPECT_EQ(manifest.existing_rows_count, kExistingRows);
+  EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows);
+}
+
+TEST_F(TestManifestListVersions, TestV2Write) {
+  auto manifest = WriteAndReadManifestList(2);

Review Comment:
   ```suggestion
     auto manifest = WriteAndReadManifestList(/*format_version=*/2);
   ```



##########
src/iceberg/v2_metadata.h:
##########
@@ -34,6 +34,10 @@ class ManifestEntryAdapterV2 : public ManifestEntryAdapter {
   Status Init() override;
   Status Append(const ManifestEntry& entry) override;
 
+  static std::shared_ptr<Schema> EntrySchema(std::shared_ptr<StructType> 
partition_type);
+  static std::shared_ptr<Schema> WrapFileSchema(std::shared_ptr<StructType> 
file_schema);
+  static std::shared_ptr<StructType> FileType(std::shared_ptr<StructType> 
partition_type);

Review Comment:
   It's a little strange that V1 uses the name DataFileSchema but V2/V3 use 
FileType, I think DataFileSchema is more precise?



##########
src/iceberg/test/manifest_list_versions_test.cc:
##########
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <optional>
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/record_batch.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/manifest_reader.h"
+#include "iceberg/manifest_writer.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/v1_metadata.h"
+
+namespace iceberg {
+
+constexpr int kRowLineageFormatVersion = 3;
+constexpr const char* kPath = "s3://bucket/table/m1.avro";
+constexpr int64_t kLength = 1024L;
+constexpr int32_t kSpecId = 1;
+constexpr int64_t kSeqNum = 34L;
+constexpr int64_t kMinSeqNum = 10L;
+constexpr int64_t kSnapshotId = 987134631982734L;
+constexpr int32_t kAddedFiles = 2;
+constexpr int64_t kAddedRows = 5292L;
+constexpr int32_t kExistingFiles = 343;
+constexpr int64_t kExistingRows = 857273L;
+constexpr int32_t kDeletedFiles = 1;
+constexpr int64_t kDeletedRows = 22910L;
+constexpr int64_t kFirstRowId = 100L;
+constexpr int64_t kSnapshotFirstRowId = 130L;
+
+const static auto kTestManifest = ManifestFile{
+    .manifest_path = kPath,
+    .manifest_length = kLength,
+    .partition_spec_id = kSpecId,
+    .content = ManifestFile::Content::kData,
+    .sequence_number = kSeqNum,
+    .min_sequence_number = kMinSeqNum,
+    .added_snapshot_id = kSnapshotId,
+    .added_files_count = kAddedFiles,
+    .existing_files_count = kExistingFiles,
+    .deleted_files_count = kDeletedFiles,
+    .added_rows_count = kAddedRows,
+    .existing_rows_count = kExistingRows,
+    .deleted_rows_count = kDeletedRows,
+    .partitions = {},
+    .key_metadata = {},
+    .first_row_id = kFirstRowId,
+};
+
+const static auto kDeleteManifest = ManifestFile{
+    .manifest_path = kPath,
+    .manifest_length = kLength,
+    .partition_spec_id = kSpecId,
+    .content = ManifestFile::Content::kDeletes,
+    .sequence_number = kSeqNum,
+    .min_sequence_number = kMinSeqNum,
+    .added_snapshot_id = kSnapshotId,
+    .added_files_count = kAddedFiles,
+    .existing_files_count = kExistingFiles,
+    .deleted_files_count = kDeletedFiles,
+    .added_rows_count = kAddedRows,
+    .existing_rows_count = kExistingRows,
+    .deleted_rows_count = kDeletedRows,
+    .first_row_id = std::nullopt,
+};
+
+class TestManifestListVersions : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    avro::RegisterAll();
+    file_io_ = iceberg::arrow::MakeMockFileIO();
+  }
+
+  static std::string CreateManifestListPath() {
+    return std::format("manifest-list-{}.avro",
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  std::string WriteManifestList(int format_version, int64_t 
expected_next_row_id,
+                                const std::vector<ManifestFile>& manifests) 
const {
+    const std::string manifest_list_path = CreateManifestListPath();
+    constexpr int64_t kParentSnapshotId = kSnapshotId - 1;
+
+    Result<std::unique_ptr<ManifestListWriter>> writer_result =
+        NotSupported("Format version: {}", format_version);
+
+    if (format_version == 1) {
+      writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId, 
kParentSnapshotId,
+                                                       manifest_list_path, 
file_io_);
+    } else if (format_version == 2) {
+      writer_result = ManifestListWriter::MakeV2Writer(
+          kSnapshotId, kParentSnapshotId, kSeqNum, manifest_list_path, 
file_io_);
+    } else if (format_version == 3) {
+      writer_result = ManifestListWriter::MakeV3Writer(kSnapshotId, 
kParentSnapshotId,
+                                                       kSeqNum, 
kSnapshotFirstRowId,
+                                                       manifest_list_path, 
file_io_);
+    }
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    EXPECT_THAT(writer->AddAll(manifests), IsOk());
+    EXPECT_THAT(writer->Close(), IsOk());
+
+    if (format_version >= kRowLineageFormatVersion) {
+      EXPECT_EQ(writer->next_row_id(), 
std::make_optional(expected_next_row_id));
+    } else {
+      EXPECT_FALSE(writer->next_row_id().has_value());
+    }
+
+    return manifest_list_path;
+  }
+
+  ManifestFile WriteAndReadManifestList(int format_version) const {
+    return ReadManifestList(
+        WriteManifestList(format_version, kSnapshotFirstRowId, 
{kTestManifest}));
+  }
+
+  ManifestFile ReadManifestList(const std::string& manifest_list_path) const {
+    auto reader_result = ManifestListReader::Make(manifest_list_path, 
file_io_);
+    EXPECT_THAT(reader_result, IsOk());
+
+    auto reader = std::move(reader_result.value());
+    auto files_result = reader->Files();
+    EXPECT_THAT(files_result, IsOk());
+
+    auto manifests = files_result.value();
+    EXPECT_EQ(manifests.size(), 1);
+
+    return manifests[0];
+  }
+
+  std::vector<ManifestFile> ReadAllManifests(
+      const std::string& manifest_list_path) const {
+    auto reader_result = ManifestListReader::Make(manifest_list_path, 
file_io_);
+    EXPECT_THAT(reader_result, IsOk());
+
+    auto reader = std::move(reader_result.value());
+    auto files_result = reader->Files();
+    EXPECT_THAT(files_result, IsOk());
+
+    return files_result.value();
+  }
+
+  void ReadAvro(const std::string& path, const std::shared_ptr<Schema>& schema,
+                const std::string& expected_json) const {
+    auto reader_result = ReaderFactoryRegistry::Open(
+        FileFormatType::kAvro, {.path = path, .io = file_io_, .projection = 
schema});
+    EXPECT_THAT(reader_result, IsOk());
+    auto reader = std::move(reader_result.value());
+
+    auto arrow_schema_result = reader->Schema();
+    EXPECT_THAT(arrow_schema_result, IsOk());
+    auto arrow_c_schema = std::move(arrow_schema_result.value());
+    auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+    auto expected_array =
+        ::arrow::json::ArrayFromJSONString(arrow_schema, 
expected_json).ValueOrDie();
+
+    auto batch_result = reader->Next();
+    EXPECT_THAT(batch_result, IsOk());
+    EXPECT_TRUE(batch_result.value().has_value());
+    auto arrow_c_batch = std::move(batch_result.value().value());
+
+    auto arrow_batch_result =
+        ::arrow::ImportArray(&arrow_c_batch, std::move(arrow_schema));
+    auto array = arrow_batch_result.ValueOrDie();
+    EXPECT_TRUE(array != nullptr);
+    EXPECT_TRUE(expected_array->Equals(*array));
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+};
+
+TEST_F(TestManifestListVersions, TestV1WriteDeleteManifest) {
+  const std::string manifest_list_path = CreateManifestListPath();
+
+  auto writer_result = ManifestListWriter::MakeV1Writer(kSnapshotId, 
kSnapshotId - 1,
+                                                        manifest_list_path, 
file_io_);
+  EXPECT_THAT(writer_result, IsOk());
+
+  auto writer = std::move(writer_result.value());
+  auto status = writer->Add(kDeleteManifest);
+
+  EXPECT_THAT(status, IsError(ErrorKind::kInvalidManifestList));
+  EXPECT_THAT(status, HasErrorMessage("Cannot store delete manifests in a v1 
table"));
+}
+
+TEST_F(TestManifestListVersions, TestV1Write) {
+  auto manifest = WriteAndReadManifestList(1);
+
+  // V3 fields are not written and are defaulted
+  EXPECT_FALSE(manifest.first_row_id.has_value());
+
+  // V2 fields are not written and are defaulted
+  EXPECT_EQ(manifest.sequence_number, 0);
+  EXPECT_EQ(manifest.min_sequence_number, 0);
+
+  // V1 fields are read correctly
+  EXPECT_EQ(manifest.manifest_path, kPath);
+  EXPECT_EQ(manifest.manifest_length, kLength);
+  EXPECT_EQ(manifest.partition_spec_id, kSpecId);
+  EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+  EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
+  EXPECT_EQ(manifest.added_files_count, kAddedFiles);
+  EXPECT_EQ(manifest.existing_files_count, kExistingFiles);
+  EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles);
+  EXPECT_EQ(manifest.added_rows_count, kAddedRows);
+  EXPECT_EQ(manifest.existing_rows_count, kExistingRows);
+  EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows);
+}
+
+TEST_F(TestManifestListVersions, TestV2Write) {
+  auto manifest = WriteAndReadManifestList(2);
+
+  // V3 fields are not written and are defaulted
+  EXPECT_FALSE(manifest.first_row_id.has_value());
+
+  // All V2 fields should be read correctly
+  EXPECT_EQ(manifest.manifest_path, kPath);
+  EXPECT_EQ(manifest.manifest_length, kLength);
+  EXPECT_EQ(manifest.partition_spec_id, kSpecId);
+  EXPECT_EQ(manifest.content, ManifestFile::Content::kData);
+  EXPECT_EQ(manifest.sequence_number, kSeqNum);
+  EXPECT_EQ(manifest.min_sequence_number, kMinSeqNum);
+  EXPECT_EQ(manifest.added_snapshot_id, kSnapshotId);
+  EXPECT_EQ(manifest.added_files_count, kAddedFiles);
+  EXPECT_EQ(manifest.added_rows_count, kAddedRows);
+  EXPECT_EQ(manifest.existing_files_count, kExistingFiles);
+  EXPECT_EQ(manifest.existing_rows_count, kExistingRows);
+  EXPECT_EQ(manifest.deleted_files_count, kDeletedFiles);
+  EXPECT_EQ(manifest.deleted_rows_count, kDeletedRows);
+}
+
+TEST_F(TestManifestListVersions, TestV3Write) {
+  auto manifest = WriteAndReadManifestList(3);

Review Comment:
   ```suggestion
     auto manifest = WriteAndReadManifestList(/*format_version=*/3);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to