Copilot commented on code in PR #708:
URL: https://github.com/apache/iceberg-cpp/pull/708#discussion_r3437281828


##########
src/iceberg/update/merging_snapshot_update.cc:
##########
@@ -754,27 +763,124 @@ MergingSnapshotUpdate::MergeDVs() const {
       continue;
     }
     if (dvs.size() > 1) {
-      // TODO(Guotao): Merge duplicate DVs for one referenced data file once 
C++
-      // has DVUtil/Puffin DV rewriting; Java merges them before writing 
manifests.
-      return NotImplemented(
-          "Merging multiple deletion vectors is not supported yet for 
referenced "
-          "data file: {}",
-          referenced_file);
+      if (!writer) {
+        ICEBERG_ASSIGN_OR_RAISE(merged_dv_path, 
ctx_->NewDataLocation(std::format(
+                                                    "merged-dvs-{}-{}.puffin",
+                                                    SnapshotId(), 
merged_dv_count_++)));
+        ICEBERG_ASSIGN_OR_RAISE(auto output,
+                                
ctx_->table->io()->NewOutputFile(merged_dv_path));
+        ICEBERG_ASSIGN_OR_RAISE(writer, 
puffin::PuffinWriter::Make(std::move(output)));
+      }
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto merged,
+          MergeDVsForReferencedFile(referenced_file, dvs, *writer, 
merged_dv_path));
+      merged_files.push_back(merged.file);
+      merged_dvs_.push_back(merged);
+      result.push_back(std::move(merged));
+      continue;

Review Comment:
   If `MergeDVsForReferencedFile(...)` fails after `merged_dv_path` and the 
output file have been created, `ICEBERG_ASSIGN_OR_RAISE` will return early 
without deleting the partially-written merged Puffin file. Because this failure 
can happen before any merged DV is recorded/committed, it can leave orphan 
`merged-dvs-...puffin` files.
   
   Consider explicitly deleting `merged_dv_path` before propagating the error 
from `MergeDVsForReferencedFile`.



##########
src/iceberg/update/merging_snapshot_update.cc:
##########
@@ -754,27 +763,124 @@ MergingSnapshotUpdate::MergeDVs() const {
       continue;
     }
     if (dvs.size() > 1) {
-      // TODO(Guotao): Merge duplicate DVs for one referenced data file once 
C++
-      // has DVUtil/Puffin DV rewriting; Java merges them before writing 
manifests.
-      return NotImplemented(
-          "Merging multiple deletion vectors is not supported yet for 
referenced "
-          "data file: {}",
-          referenced_file);
+      if (!writer) {
+        ICEBERG_ASSIGN_OR_RAISE(merged_dv_path, 
ctx_->NewDataLocation(std::format(
+                                                    "merged-dvs-{}-{}.puffin",
+                                                    SnapshotId(), 
merged_dv_count_++)));
+        ICEBERG_ASSIGN_OR_RAISE(auto output,
+                                
ctx_->table->io()->NewOutputFile(merged_dv_path));
+        ICEBERG_ASSIGN_OR_RAISE(writer, 
puffin::PuffinWriter::Make(std::move(output)));
+      }
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto merged,
+          MergeDVsForReferencedFile(referenced_file, dvs, *writer, 
merged_dv_path));
+      merged_files.push_back(merged.file);
+      merged_dvs_.push_back(merged);
+      result.push_back(std::move(merged));
+      continue;
     }
 
     result.push_back(dvs.front());
   }
 
+  if (writer) {
+    ICEBERG_RETURN_UNEXPECTED(writer->Finish());
+    ICEBERG_ASSIGN_OR_RAISE(auto file_size, writer->FileSize());
+    for (const auto& file : merged_files) {
+      file->file_size_in_bytes = file_size;
+    }
+  }
+
   return result;
 }
 
+Result<MergingSnapshotUpdate::PendingDeleteFile>
+MergingSnapshotUpdate::MergeDVsForReferencedFile(
+    const std::string& referenced_file, const std::vector<PendingDeleteFile>& 
dvs,
+    puffin::PuffinWriter& writer, const std::string& path) {
+  const auto& first_file = dvs.front().file;
+  const auto first_data_sequence_number = dvs.front().data_sequence_number;
+  const auto first_spec_id = first_file->partition_spec_id;
+  RoaringPositionBitmap bitmap;
+
+  for (const auto& dv : dvs) {
+    ICEBERG_PRECHECK(dv.data_sequence_number == first_data_sequence_number,
+                     "Cannot merge DVs, mismatched sequence numbers for {}",
+                     referenced_file);
+    ICEBERG_PRECHECK(dv.file->partition_spec_id == first_spec_id,
+                     "Cannot merge DVs, mismatched partition specs for {}",
+                     referenced_file);
+    ICEBERG_PRECHECK(dv.file->partition == first_file->partition,
+                     "Cannot merge DVs, mismatched partition tuples for {}",
+                     referenced_file);
+    ICEBERG_PRECHECK(dv.file->content_offset.has_value(),
+                     "DV must have a content offset: {}", dv.file->file_path);
+    ICEBERG_PRECHECK(dv.file->content_size_in_bytes.has_value(),
+                     "DV must have a content size: {}", dv.file->file_path);
+
+    ICEBERG_ASSIGN_OR_RAISE(auto input,
+                            
ctx_->table->io()->NewInputFile(dv.file->file_path));
+    ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                            puffin::PuffinReader::Make(std::move(input), 
std::nullopt,
+                                                       
dv.file->file_size_in_bytes));

Review Comment:
   `PuffinReader::Make` takes an *optional* file size hint. Passing 
`dv.file->file_size_in_bytes` unconditionally means a default/unknown value of 
`0` will be treated as a known file size, which can cause the reader to fail 
even though it could have fetched the actual size itself.
   
   Pass `std::nullopt` when `file_size_in_bytes` is not set to a meaningful 
value (e.g. `<= 0`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to