This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 69cf2d3f feat: Implement PositionDeleteWriter for position delete
files (#582)
69cf2d3f is described below
commit 69cf2d3f6735974cb7674ff1b6fc87b3b6d91948
Author: Xinli Shang <[email protected]>
AuthorDate: Sat Mar 14 19:43:00 2026 -0700
feat: Implement PositionDeleteWriter for position delete files (#582)
Implement the PositionDeleteWriter following the same PIMPL pattern as
DataWriter. The writer supports both buffered WriteDelete(file_path,
pos) calls and direct Write(ArrowArray*) for pre-formed batches.
Metadata reports content=kPositionDeletes with sort_order_id=nullopt per
spec, and tracks referenced_data_file when all deletes target a single
file.
---------
Co-authored-by: shangxinli <[email protected]>
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
src/iceberg/arrow_c_data_guard_internal.cc | 4 +-
src/iceberg/data/position_delete_writer.cc | 203 ++++++++++++++++++++++++++++-
src/iceberg/data/position_delete_writer.h | 8 +-
src/iceberg/test/data_writer_test.cc | 159 ++++++++++++++++++++++
4 files changed, 366 insertions(+), 8 deletions(-)
diff --git a/src/iceberg/arrow_c_data_guard_internal.cc
b/src/iceberg/arrow_c_data_guard_internal.cc
index 5fb3f9fa..fd421707 100644
--- a/src/iceberg/arrow_c_data_guard_internal.cc
+++ b/src/iceberg/arrow_c_data_guard_internal.cc
@@ -22,13 +22,13 @@
namespace iceberg::internal {
ArrowArrayGuard::~ArrowArrayGuard() {
- if (array_ != nullptr) {
+ if (array_ != nullptr && array_->release != nullptr) {
ArrowArrayRelease(array_);
}
}
ArrowSchemaGuard::~ArrowSchemaGuard() {
- if (schema_ != nullptr) {
+ if (schema_ != nullptr && schema_->release != nullptr) {
ArrowSchemaRelease(schema_);
}
}
diff --git a/src/iceberg/data/position_delete_writer.cc
b/src/iceberg/data/position_delete_writer.cc
index f5874108..9fae9c2b 100644
--- a/src/iceberg/data/position_delete_writer.cc
+++ b/src/iceberg/data/position_delete_writer.cc
@@ -19,26 +19,219 @@
#include "iceberg/data/position_delete_writer.h"
+#include <map>
+#include <set>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class PositionDeleteWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions
options) {
+ auto delete_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ MetadataColumns::kDeleteFilePath,
+ MetadataColumns::kDeleteFilePos,
+ });
+
+ WriterOptions writer_options{
+ .path = options.path,
+ .schema = delete_schema,
+ .io = options.io,
+ .properties = WriterProperties::FromMap(options.properties),
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(
+ new Impl(std::move(options), std::move(delete_schema),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_PRECHECK(buffered_paths_.empty(),
+ "Cannot write batch data when there are buffered
deletes.");
+ // TODO(anyone): Extract file paths from ArrowArray to update
referenced_paths_.
+ return writer_->Write(data);
+ }
+
+ Status WriteDelete(std::string_view file_path, int64_t pos) {
+ // TODO(anyone): check if the sort order of file_path and pos observes the
spec.
+ buffered_paths_.emplace_back(file_path);
+ buffered_positions_.push_back(pos);
+ referenced_paths_.emplace(file_path);
+
+ if (buffered_paths_.size() >= options_.flush_threshold) {
+ return FlushBuffer();
+ }
+ return {};
+ }
+
+ Result<int64_t> Length() const { return writer_->length(); }
+
+ Status Close() {
+ if (closed_) {
+ return {};
+ }
+ if (!buffered_paths_.empty()) {
+ ICEBERG_RETURN_UNEXPECTED(FlushBuffer());
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+ ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+ auto split_offsets = writer_->split_offsets();
+
+ // Filter out metrics for delete metadata columns (file_path, pos) to avoid
+ // bloating the manifest, matching Java's PositionDeleteWriter behavior.
+ // Always remove field counts; also remove bounds when referencing
multiple files.
+ const auto path_id = MetadataColumns::kDeleteFilePathColumnId;
+ const auto pos_id = MetadataColumns::kDeleteFilePosColumnId;
+
+ metrics.value_counts.erase(path_id);
+ metrics.value_counts.erase(pos_id);
+ metrics.null_value_counts.erase(path_id);
+ metrics.null_value_counts.erase(pos_id);
+ metrics.nan_value_counts.erase(path_id);
+ metrics.nan_value_counts.erase(pos_id);
+
+ if (referenced_paths_.size() > 1) {
+ metrics.lower_bounds.erase(path_id);
+ metrics.lower_bounds.erase(pos_id);
+ metrics.upper_bounds.erase(path_id);
+ metrics.upper_bounds.erase(pos_id);
+ }
+
+ // Serialize literal bounds to binary format
+ std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
+ for (const auto& [col_id, literal] : metrics.lower_bounds) {
+ ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+ lower_bounds_map[col_id] = std::move(serialized);
+ }
+ std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
+ for (const auto& [col_id, literal] : metrics.upper_bounds) {
+ ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+ upper_bounds_map[col_id] = std::move(serialized);
+ }
+
+ // Set referenced_data_file if all deletes reference the same data file
+ std::optional<std::string> referenced_data_file;
+ if (referenced_paths_.size() == 1) {
+ referenced_data_file = *referenced_paths_.begin();
+ }
+
+ auto data_file = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kPositionDeletes,
+ .file_path = options_.path,
+ .file_format = options_.format,
+ .partition = options_.partition,
+ .record_count = metrics.row_count.value_or(-1),
+ .file_size_in_bytes = length,
+ .column_sizes = {metrics.column_sizes.begin(),
metrics.column_sizes.end()},
+ .value_counts = {metrics.value_counts.begin(),
metrics.value_counts.end()},
+ .null_value_counts = {metrics.null_value_counts.begin(),
+ metrics.null_value_counts.end()},
+ .nan_value_counts = {metrics.nan_value_counts.begin(),
+ metrics.nan_value_counts.end()},
+ .lower_bounds = std::move(lower_bounds_map),
+ .upper_bounds = std::move(upper_bounds_map),
+ .split_offsets = std::move(split_offsets),
+ .sort_order_id = std::nullopt,
+ .referenced_data_file = std::move(referenced_data_file),
+ });
+
+ FileWriter::WriteResult result;
+ result.data_files.push_back(std::move(data_file));
+ return result;
+ }
+
+ private:
+ Impl(PositionDeleteWriterOptions options, std::shared_ptr<Schema>
delete_schema,
+ std::unique_ptr<Writer> writer)
+ : options_(std::move(options)),
+ delete_schema_(std::move(delete_schema)),
+ writer_(std::move(writer)) {}
+
+ Status FlushBuffer() {
+ ArrowSchema arrow_schema;
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*delete_schema_, &arrow_schema));
+ internal::ArrowSchemaGuard schema_guard(&arrow_schema);
+
+ ArrowArray array;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayInitFromSchema(&array, &arrow_schema, &error), error);
+ internal::ArrowArrayGuard array_guard(&array);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&array));
+
+ for (size_t i = 0; i < buffered_paths_.size(); ++i) {
+ ArrowStringView path_view(buffered_paths_[i].data(),
+
static_cast<int64_t>(buffered_paths_[i].size()));
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(
+ ArrowArrayAppendString(array.children[0], path_view));
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(
+ ArrowArrayAppendInt(array.children[1], buffered_positions_[i]));
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&array));
+ }
+
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayFinishBuildingDefault(&array, &error), error);
+
+ ICEBERG_RETURN_UNEXPECTED(writer_->Write(&array));
+
+ buffered_paths_.clear();
+ buffered_positions_.clear();
+ return {};
+ }
+
+ PositionDeleteWriterOptions options_;
+ std::shared_ptr<Schema> delete_schema_;
+ std::unique_ptr<Writer> writer_;
+ bool closed_ = false;
+ std::vector<std::string> buffered_paths_;
+ std::vector<int64_t> buffered_positions_;
+ std::set<std::string> referenced_paths_;
};
+PositionDeleteWriter::PositionDeleteWriter(std::unique_ptr<Impl> impl)
+ : impl_(std::move(impl)) {}
+
PositionDeleteWriter::~PositionDeleteWriter() = default;
-Status PositionDeleteWriter::Write(ArrowArray* data) { return
NotImplemented(""); }
+Result<std::unique_ptr<PositionDeleteWriter>> PositionDeleteWriter::Make(
+ const PositionDeleteWriterOptions& options) {
+ ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
+ return std::unique_ptr<PositionDeleteWriter>(new
PositionDeleteWriter(std::move(impl)));
+}
+
+Status PositionDeleteWriter::Write(ArrowArray* data) { return
impl_->Write(data); }
Status PositionDeleteWriter::WriteDelete(std::string_view file_path, int64_t
pos) {
- return NotImplemented("");
+ return impl_->WriteDelete(file_path, pos);
}
-Result<int64_t> PositionDeleteWriter::Length() const { return
NotImplemented(""); }
+Result<int64_t> PositionDeleteWriter::Length() const { return impl_->Length();
}
-Status PositionDeleteWriter::Close() { return NotImplemented(""); }
+Status PositionDeleteWriter::Close() { return impl_->Close(); }
Result<FileWriter::WriteResult> PositionDeleteWriter::Metadata() {
- return NotImplemented("");
+ return impl_->Metadata();
}
} // namespace iceberg
diff --git a/src/iceberg/data/position_delete_writer.h
b/src/iceberg/data/position_delete_writer.h
index c660812c..de7a2c3f 100644
--- a/src/iceberg/data/position_delete_writer.h
+++ b/src/iceberg/data/position_delete_writer.h
@@ -46,7 +46,7 @@ struct ICEBERG_EXPORT PositionDeleteWriterOptions {
PartitionValues partition;
FileFormatType format = FileFormatType::kParquet;
std::shared_ptr<FileIO> io;
- std::shared_ptr<Schema> row_schema; // Optional row data schema
+ int64_t flush_threshold = 1000; // Number of buffered deletes before
auto-flush
std::unordered_map<std::string, std::string> properties;
};
@@ -55,6 +55,10 @@ class ICEBERG_EXPORT PositionDeleteWriter : public
FileWriter {
public:
~PositionDeleteWriter() override;
+ /// \brief Create a new PositionDeleteWriter instance.
+ static Result<std::unique_ptr<PositionDeleteWriter>> Make(
+ const PositionDeleteWriterOptions& options);
+
Status Write(ArrowArray* data) override;
Status WriteDelete(std::string_view file_path, int64_t pos);
Result<int64_t> Length() const override;
@@ -64,6 +68,8 @@ class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter
{
private:
class Impl;
std::unique_ptr<Impl> impl_;
+
+ explicit PositionDeleteWriter(std::unique_ptr<Impl> impl);
};
} // namespace iceberg
diff --git a/src/iceberg/test/data_writer_test.cc
b/src/iceberg/test/data_writer_test.cc
index 7671e7fe..f7778faf 100644
--- a/src/iceberg/test/data_writer_test.cc
+++ b/src/iceberg/test/data_writer_test.cc
@@ -27,8 +27,10 @@
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_register.h"
+#include "iceberg/data/position_delete_writer.h"
#include "iceberg/file_format.h"
#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
#include "iceberg/parquet/parquet_register.h"
#include "iceberg/partition_spec.h"
#include "iceberg/row/partition_values.h"
@@ -264,4 +266,161 @@ TEST_F(DataWriterTest, WriteMultipleBatches) {
EXPECT_GT(data_file->file_size_in_bytes, 0);
}
+class PositionDeleteWriterTest : public DataWriterTest {
+ protected:
+ PositionDeleteWriterOptions MakeDeleteOptions(int64_t flush_threshold =
1000) {
+ return PositionDeleteWriterOptions{
+ .path = "test_deletes.parquet",
+ .schema = schema_,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .flush_threshold = flush_threshold,
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+ }
+
+ std::shared_ptr<::arrow::Array> CreatePositionDeleteData() {
+ auto delete_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ MetadataColumns::kDeleteFilePath, MetadataColumns::kDeleteFilePos});
+
+ ArrowSchema arrow_c_schema;
+ ICEBERG_THROW_NOT_OK(ToArrowSchema(*delete_schema, &arrow_c_schema));
+ auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+ return ::arrow::json::ArrayFromJSONString(
+ ::arrow::struct_(arrow_type->fields()),
+ R"([["data_file_1.parquet", 0], ["data_file_1.parquet", 5],
["data_file_1.parquet", 10]])")
+ .ValueOrDie();
+ }
+};
+
+TEST_F(PositionDeleteWriterTest, WriteDeleteAndClose) {
+ auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
+ ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk());
+ ASSERT_THAT(writer->WriteDelete("data_file.parquet", 10), IsOk());
+
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto length_result = writer->Length();
+ ASSERT_THAT(length_result, IsOk());
+ EXPECT_GT(length_result.value(), 0);
+}
+
+TEST_F(PositionDeleteWriterTest, MetadataAfterClose) {
+ auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
+ ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+
+ const auto& write_result = metadata_result.value();
+ ASSERT_EQ(write_result.data_files.size(), 1);
+
+ const auto& data_file = write_result.data_files[0];
+ EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
+ EXPECT_EQ(data_file->file_path, "test_deletes.parquet");
+ EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
+ EXPECT_GT(data_file->file_size_in_bytes, 0);
+ EXPECT_FALSE(data_file->sort_order_id.has_value());
+}
+
+TEST_F(PositionDeleteWriterTest, MetadataBeforeCloseReturnsError) {
+ auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(metadata_result,
+ HasErrorMessage("Cannot get metadata before closing the
writer"));
+}
+
+TEST_F(PositionDeleteWriterTest, CloseIsIdempotent) {
+ auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
+
+ ASSERT_THAT(writer->Close(), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+}
+
+TEST_F(PositionDeleteWriterTest, WriteMultipleDeletes) {
+ auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ for (int64_t i = 0; i < 100; ++i) {
+ ASSERT_THAT(writer->WriteDelete("data_file.parquet", i), IsOk());
+ }
+
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+
+ const auto& data_file = metadata_result.value().data_files[0];
+ EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
+ EXPECT_GT(data_file->file_size_in_bytes, 0);
+}
+
+TEST_F(PositionDeleteWriterTest, WriteBatchData) {
+ auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ auto test_data = CreatePositionDeleteData();
+ ArrowArray arrow_array;
+ ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
+ ASSERT_THAT(writer->Write(&arrow_array), IsOk());
+
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+
+ const auto& data_file = metadata_result.value().data_files[0];
+ EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
+ EXPECT_GT(data_file->file_size_in_bytes, 0);
+}
+
+TEST_F(PositionDeleteWriterTest, AutoFlushOnThreshold) {
+ // Use a small flush threshold to trigger automatic flush
+ const int64_t flush_threshold = 5;
+ auto writer_result =
PositionDeleteWriter::Make(MakeDeleteOptions(flush_threshold));
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ // Write more deletes than the threshold to trigger auto-flush
+ for (int64_t i = 0; i < 12; ++i) {
+ ASSERT_THAT(writer->WriteDelete("data_file.parquet", i), IsOk());
+ }
+
+ // Length should be > 0 since auto-flush should have written data
+ auto length_result = writer->Length();
+ ASSERT_THAT(length_result, IsOk());
+ EXPECT_GT(length_result.value(), 0);
+
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+ const auto& data_file = metadata_result.value().data_files[0];
+ EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
+ EXPECT_GT(data_file->file_size_in_bytes, 0);
+}
+
} // namespace iceberg