Copilot commented on code in PR #708:
URL: https://github.com/apache/iceberg-cpp/pull/708#discussion_r3437281828
##########
src/iceberg/update/merging_snapshot_update.cc:
##########
@@ -754,27 +763,124 @@ MergingSnapshotUpdate::MergeDVs() const {
continue;
}
if (dvs.size() > 1) {
- // TODO(Guotao): Merge duplicate DVs for one referenced data file once
C++
- // has DVUtil/Puffin DV rewriting; Java merges them before writing
manifests.
- return NotImplemented(
- "Merging multiple deletion vectors is not supported yet for
referenced "
- "data file: {}",
- referenced_file);
+ if (!writer) {
+ ICEBERG_ASSIGN_OR_RAISE(merged_dv_path,
ctx_->NewDataLocation(std::format(
+ "merged-dvs-{}-{}.puffin",
+ SnapshotId(),
merged_dv_count_++)));
+ ICEBERG_ASSIGN_OR_RAISE(auto output,
+
ctx_->table->io()->NewOutputFile(merged_dv_path));
+ ICEBERG_ASSIGN_OR_RAISE(writer,
puffin::PuffinWriter::Make(std::move(output)));
+ }
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto merged,
+ MergeDVsForReferencedFile(referenced_file, dvs, *writer,
merged_dv_path));
+ merged_files.push_back(merged.file);
+ merged_dvs_.push_back(merged);
+ result.push_back(std::move(merged));
+ continue;
Review Comment:
If `MergeDVsForReferencedFile(...)` fails after `merged_dv_path` and the
output file have been created, `ICEBERG_ASSIGN_OR_RAISE` will return early
without deleting the partially-written merged Puffin file. Because this failure
can happen before any merged DV is recorded/committed, it can leave orphan
`merged-dvs-...puffin` files.
Consider explicitly deleting `merged_dv_path` before propagating the error
from `MergeDVsForReferencedFile`.
##########
src/iceberg/update/merging_snapshot_update.cc:
##########
@@ -754,27 +763,124 @@ MergingSnapshotUpdate::MergeDVs() const {
continue;
}
if (dvs.size() > 1) {
- // TODO(Guotao): Merge duplicate DVs for one referenced data file once
C++
- // has DVUtil/Puffin DV rewriting; Java merges them before writing
manifests.
- return NotImplemented(
- "Merging multiple deletion vectors is not supported yet for
referenced "
- "data file: {}",
- referenced_file);
+ if (!writer) {
+ ICEBERG_ASSIGN_OR_RAISE(merged_dv_path,
ctx_->NewDataLocation(std::format(
+ "merged-dvs-{}-{}.puffin",
+ SnapshotId(),
merged_dv_count_++)));
+ ICEBERG_ASSIGN_OR_RAISE(auto output,
+
ctx_->table->io()->NewOutputFile(merged_dv_path));
+ ICEBERG_ASSIGN_OR_RAISE(writer,
puffin::PuffinWriter::Make(std::move(output)));
+ }
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto merged,
+ MergeDVsForReferencedFile(referenced_file, dvs, *writer,
merged_dv_path));
+ merged_files.push_back(merged.file);
+ merged_dvs_.push_back(merged);
+ result.push_back(std::move(merged));
+ continue;
}
result.push_back(dvs.front());
}
+ if (writer) {
+ ICEBERG_RETURN_UNEXPECTED(writer->Finish());
+ ICEBERG_ASSIGN_OR_RAISE(auto file_size, writer->FileSize());
+ for (const auto& file : merged_files) {
+ file->file_size_in_bytes = file_size;
+ }
+ }
+
return result;
}
+Result<MergingSnapshotUpdate::PendingDeleteFile>
+MergingSnapshotUpdate::MergeDVsForReferencedFile(
+ const std::string& referenced_file, const std::vector<PendingDeleteFile>&
dvs,
+ puffin::PuffinWriter& writer, const std::string& path) {
+ const auto& first_file = dvs.front().file;
+ const auto first_data_sequence_number = dvs.front().data_sequence_number;
+ const auto first_spec_id = first_file->partition_spec_id;
+ RoaringPositionBitmap bitmap;
+
+ for (const auto& dv : dvs) {
+ ICEBERG_PRECHECK(dv.data_sequence_number == first_data_sequence_number,
+ "Cannot merge DVs, mismatched sequence numbers for {}",
+ referenced_file);
+ ICEBERG_PRECHECK(dv.file->partition_spec_id == first_spec_id,
+ "Cannot merge DVs, mismatched partition specs for {}",
+ referenced_file);
+ ICEBERG_PRECHECK(dv.file->partition == first_file->partition,
+ "Cannot merge DVs, mismatched partition tuples for {}",
+ referenced_file);
+ ICEBERG_PRECHECK(dv.file->content_offset.has_value(),
+ "DV must have a content offset: {}", dv.file->file_path);
+ ICEBERG_PRECHECK(dv.file->content_size_in_bytes.has_value(),
+ "DV must have a content size: {}", dv.file->file_path);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto input,
+
ctx_->table->io()->NewInputFile(dv.file->file_path));
+ ICEBERG_ASSIGN_OR_RAISE(auto reader,
+ puffin::PuffinReader::Make(std::move(input),
std::nullopt,
+
dv.file->file_size_in_bytes));
Review Comment:
`PuffinReader::Make` takes an *optional* file size hint. Passing
`dv.file->file_size_in_bytes` unconditionally means a default/unknown value of
`0` will be treated as a known file size, which can cause the reader to fail
even though it could have fetched the actual size itself.
Pass `std::nullopt` when `file_size_in_bytes` is not set to a meaningful
value (e.g. `<= 0`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]