This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 69cf2d3f feat: Implement PositionDeleteWriter for position delete 
files (#582)
69cf2d3f is described below

commit 69cf2d3f6735974cb7674ff1b6fc87b3b6d91948
Author: Xinli Shang <[email protected]>
AuthorDate: Sat Mar 14 19:43:00 2026 -0700

    feat: Implement PositionDeleteWriter for position delete files (#582)
    
    Implement the PositionDeleteWriter following the same PIMPL pattern as
    DataWriter. The writer supports both buffered WriteDelete(file_path,
    pos) calls and direct Write(ArrowArray*) for pre-formed batches.
    Metadata reports content=kPositionDeletes with sort_order_id=nullopt per
    spec, and tracks referenced_data_file when all deletes target a single
    file.
    
    ---------
    
    Co-authored-by: shangxinli <[email protected]>
    Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
 src/iceberg/arrow_c_data_guard_internal.cc |   4 +-
 src/iceberg/data/position_delete_writer.cc | 203 ++++++++++++++++++++++++++++-
 src/iceberg/data/position_delete_writer.h  |   8 +-
 src/iceberg/test/data_writer_test.cc       | 159 ++++++++++++++++++++++
 4 files changed, 366 insertions(+), 8 deletions(-)

diff --git a/src/iceberg/arrow_c_data_guard_internal.cc 
b/src/iceberg/arrow_c_data_guard_internal.cc
index 5fb3f9fa..fd421707 100644
--- a/src/iceberg/arrow_c_data_guard_internal.cc
+++ b/src/iceberg/arrow_c_data_guard_internal.cc
@@ -22,13 +22,13 @@
 namespace iceberg::internal {
 
 ArrowArrayGuard::~ArrowArrayGuard() {
-  if (array_ != nullptr) {
+  if (array_ != nullptr && array_->release != nullptr) {
     ArrowArrayRelease(array_);
   }
 }
 
 ArrowSchemaGuard::~ArrowSchemaGuard() {
-  if (schema_ != nullptr) {
+  if (schema_ != nullptr && schema_->release != nullptr) {
     ArrowSchemaRelease(schema_);
   }
 }
diff --git a/src/iceberg/data/position_delete_writer.cc 
b/src/iceberg/data/position_delete_writer.cc
index f5874108..9fae9c2b 100644
--- a/src/iceberg/data/position_delete_writer.cc
+++ b/src/iceberg/data/position_delete_writer.cc
@@ -19,26 +19,219 @@
 
 #include "iceberg/data/position_delete_writer.h"
 
+#include <map>
+#include <set>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
 namespace iceberg {
 
 class PositionDeleteWriter::Impl {
  public:
+  static Result<std::unique_ptr<Impl>> Make(PositionDeleteWriterOptions 
options) {
+    auto delete_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+        MetadataColumns::kDeleteFilePath,
+        MetadataColumns::kDeleteFilePos,
+    });
+
+    WriterOptions writer_options{
+        .path = options.path,
+        .schema = delete_schema,
+        .io = options.io,
+        .properties = WriterProperties::FromMap(options.properties),
+    };
+
+    ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                            WriterFactoryRegistry::Open(options.format, 
writer_options));
+
+    return std::unique_ptr<Impl>(
+        new Impl(std::move(options), std::move(delete_schema), 
std::move(writer)));
+  }
+
+  Status Write(ArrowArray* data) {
+    ICEBERG_PRECHECK(buffered_paths_.empty(),
+                     "Cannot write batch data when there are buffered 
deletes.");
+    // TODO(anyone): Extract file paths from ArrowArray to update 
referenced_paths_.
+    return writer_->Write(data);
+  }
+
+  Status WriteDelete(std::string_view file_path, int64_t pos) {
+    // TODO(anyone): check if the sort order of file_path and pos observes the 
spec.
+    buffered_paths_.emplace_back(file_path);
+    buffered_positions_.push_back(pos);
+    referenced_paths_.emplace(file_path);
+
+    if (buffered_paths_.size() >= options_.flush_threshold) {
+      return FlushBuffer();
+    }
+    return {};
+  }
+
+  Result<int64_t> Length() const { return writer_->length(); }
+
+  Status Close() {
+    if (closed_) {
+      return {};
+    }
+    if (!buffered_paths_.empty()) {
+      ICEBERG_RETURN_UNEXPECTED(FlushBuffer());
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+    closed_ = true;
+    return {};
+  }
+
+  Result<FileWriter::WriteResult> Metadata() {
+    ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
+
+    ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+    ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+    auto split_offsets = writer_->split_offsets();
+
+    // Filter out metrics for delete metadata columns (file_path, pos) to avoid
+    // bloating the manifest, matching Java's PositionDeleteWriter behavior.
+    // Always remove field counts; also remove bounds when referencing 
multiple files.
+    const auto path_id = MetadataColumns::kDeleteFilePathColumnId;
+    const auto pos_id = MetadataColumns::kDeleteFilePosColumnId;
+
+    metrics.value_counts.erase(path_id);
+    metrics.value_counts.erase(pos_id);
+    metrics.null_value_counts.erase(path_id);
+    metrics.null_value_counts.erase(pos_id);
+    metrics.nan_value_counts.erase(path_id);
+    metrics.nan_value_counts.erase(pos_id);
+
+    if (referenced_paths_.size() > 1) {
+      metrics.lower_bounds.erase(path_id);
+      metrics.lower_bounds.erase(pos_id);
+      metrics.upper_bounds.erase(path_id);
+      metrics.upper_bounds.erase(pos_id);
+    }
+
+    // Serialize literal bounds to binary format
+    std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
+    for (const auto& [col_id, literal] : metrics.lower_bounds) {
+      ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+      lower_bounds_map[col_id] = std::move(serialized);
+    }
+    std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
+    for (const auto& [col_id, literal] : metrics.upper_bounds) {
+      ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+      upper_bounds_map[col_id] = std::move(serialized);
+    }
+
+    // Set referenced_data_file if all deletes reference the same data file
+    std::optional<std::string> referenced_data_file;
+    if (referenced_paths_.size() == 1) {
+      referenced_data_file = *referenced_paths_.begin();
+    }
+
+    auto data_file = std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kPositionDeletes,
+        .file_path = options_.path,
+        .file_format = options_.format,
+        .partition = options_.partition,
+        .record_count = metrics.row_count.value_or(-1),
+        .file_size_in_bytes = length,
+        .column_sizes = {metrics.column_sizes.begin(), 
metrics.column_sizes.end()},
+        .value_counts = {metrics.value_counts.begin(), 
metrics.value_counts.end()},
+        .null_value_counts = {metrics.null_value_counts.begin(),
+                              metrics.null_value_counts.end()},
+        .nan_value_counts = {metrics.nan_value_counts.begin(),
+                             metrics.nan_value_counts.end()},
+        .lower_bounds = std::move(lower_bounds_map),
+        .upper_bounds = std::move(upper_bounds_map),
+        .split_offsets = std::move(split_offsets),
+        .sort_order_id = std::nullopt,
+        .referenced_data_file = std::move(referenced_data_file),
+    });
+
+    FileWriter::WriteResult result;
+    result.data_files.push_back(std::move(data_file));
+    return result;
+  }
+
+ private:
+  Impl(PositionDeleteWriterOptions options, std::shared_ptr<Schema> 
delete_schema,
+       std::unique_ptr<Writer> writer)
+      : options_(std::move(options)),
+        delete_schema_(std::move(delete_schema)),
+        writer_(std::move(writer)) {}
+
+  Status FlushBuffer() {
+    ArrowSchema arrow_schema;
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*delete_schema_, &arrow_schema));
+    internal::ArrowSchemaGuard schema_guard(&arrow_schema);
+
+    ArrowArray array;
+    ArrowError error;
+    ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+        ArrowArrayInitFromSchema(&array, &arrow_schema, &error), error);
+    internal::ArrowArrayGuard array_guard(&array);
+    ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&array));
+
+    for (size_t i = 0; i < buffered_paths_.size(); ++i) {
+      ArrowStringView path_view(buffered_paths_[i].data(),
+                                
static_cast<int64_t>(buffered_paths_[i].size()));
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(
+          ArrowArrayAppendString(array.children[0], path_view));
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(
+          ArrowArrayAppendInt(array.children[1], buffered_positions_[i]));
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&array));
+    }
+
+    ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+        ArrowArrayFinishBuildingDefault(&array, &error), error);
+
+    ICEBERG_RETURN_UNEXPECTED(writer_->Write(&array));
+
+    buffered_paths_.clear();
+    buffered_positions_.clear();
+    return {};
+  }
+
+  PositionDeleteWriterOptions options_;
+  std::shared_ptr<Schema> delete_schema_;
+  std::unique_ptr<Writer> writer_;
+  bool closed_ = false;
+  std::vector<std::string> buffered_paths_;
+  std::vector<int64_t> buffered_positions_;
+  std::set<std::string> referenced_paths_;
 };
 
+PositionDeleteWriter::PositionDeleteWriter(std::unique_ptr<Impl> impl)
+    : impl_(std::move(impl)) {}
+
 PositionDeleteWriter::~PositionDeleteWriter() = default;
 
-Status PositionDeleteWriter::Write(ArrowArray* data) { return 
NotImplemented(""); }
+Result<std::unique_ptr<PositionDeleteWriter>> PositionDeleteWriter::Make(
+    const PositionDeleteWriterOptions& options) {
+  ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
+  return std::unique_ptr<PositionDeleteWriter>(new 
PositionDeleteWriter(std::move(impl)));
+}
+
+Status PositionDeleteWriter::Write(ArrowArray* data) { return 
impl_->Write(data); }
 
 Status PositionDeleteWriter::WriteDelete(std::string_view file_path, int64_t 
pos) {
-  return NotImplemented("");
+  return impl_->WriteDelete(file_path, pos);
 }
 
-Result<int64_t> PositionDeleteWriter::Length() const { return 
NotImplemented(""); }
+Result<int64_t> PositionDeleteWriter::Length() const { return impl_->Length(); 
}
 
-Status PositionDeleteWriter::Close() { return NotImplemented(""); }
+Status PositionDeleteWriter::Close() { return impl_->Close(); }
 
 Result<FileWriter::WriteResult> PositionDeleteWriter::Metadata() {
-  return NotImplemented("");
+  return impl_->Metadata();
 }
 
 }  // namespace iceberg
diff --git a/src/iceberg/data/position_delete_writer.h 
b/src/iceberg/data/position_delete_writer.h
index c660812c..de7a2c3f 100644
--- a/src/iceberg/data/position_delete_writer.h
+++ b/src/iceberg/data/position_delete_writer.h
@@ -46,7 +46,7 @@ struct ICEBERG_EXPORT PositionDeleteWriterOptions {
   PartitionValues partition;
   FileFormatType format = FileFormatType::kParquet;
   std::shared_ptr<FileIO> io;
-  std::shared_ptr<Schema> row_schema;  // Optional row data schema
+  int64_t flush_threshold = 1000;  // Number of buffered deletes before 
auto-flush
   std::unordered_map<std::string, std::string> properties;
 };
 
@@ -55,6 +55,10 @@ class ICEBERG_EXPORT PositionDeleteWriter : public 
FileWriter {
  public:
   ~PositionDeleteWriter() override;
 
+  /// \brief Create a new PositionDeleteWriter instance.
+  static Result<std::unique_ptr<PositionDeleteWriter>> Make(
+      const PositionDeleteWriterOptions& options);
+
   Status Write(ArrowArray* data) override;
   Status WriteDelete(std::string_view file_path, int64_t pos);
   Result<int64_t> Length() const override;
@@ -64,6 +68,8 @@ class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter 
{
  private:
   class Impl;
   std::unique_ptr<Impl> impl_;
+
+  explicit PositionDeleteWriter(std::unique_ptr<Impl> impl);
 };
 
 }  // namespace iceberg
diff --git a/src/iceberg/test/data_writer_test.cc 
b/src/iceberg/test/data_writer_test.cc
index 7671e7fe..f7778faf 100644
--- a/src/iceberg/test/data_writer_test.cc
+++ b/src/iceberg/test/data_writer_test.cc
@@ -27,8 +27,10 @@
 
 #include "iceberg/arrow/arrow_fs_file_io_internal.h"
 #include "iceberg/avro/avro_register.h"
+#include "iceberg/data/position_delete_writer.h"
 #include "iceberg/file_format.h"
 #include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
 #include "iceberg/parquet/parquet_register.h"
 #include "iceberg/partition_spec.h"
 #include "iceberg/row/partition_values.h"
@@ -264,4 +266,161 @@ TEST_F(DataWriterTest, WriteMultipleBatches) {
   EXPECT_GT(data_file->file_size_in_bytes, 0);
 }
 
+class PositionDeleteWriterTest : public DataWriterTest {
+ protected:
+  PositionDeleteWriterOptions MakeDeleteOptions(int64_t flush_threshold = 
1000) {
+    return PositionDeleteWriterOptions{
+        .path = "test_deletes.parquet",
+        .schema = schema_,
+        .spec = partition_spec_,
+        .partition = PartitionValues{},
+        .format = FileFormatType::kParquet,
+        .io = file_io_,
+        .flush_threshold = flush_threshold,
+        .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+    };
+  }
+
+  std::shared_ptr<::arrow::Array> CreatePositionDeleteData() {
+    auto delete_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+        MetadataColumns::kDeleteFilePath, MetadataColumns::kDeleteFilePos});
+
+    ArrowSchema arrow_c_schema;
+    ICEBERG_THROW_NOT_OK(ToArrowSchema(*delete_schema, &arrow_c_schema));
+    auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+    return ::arrow::json::ArrayFromJSONString(
+               ::arrow::struct_(arrow_type->fields()),
+               R"([["data_file_1.parquet", 0], ["data_file_1.parquet", 5], 
["data_file_1.parquet", 10]])")
+        .ValueOrDie();
+  }
+};
+
+TEST_F(PositionDeleteWriterTest, WriteDeleteAndClose) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk());
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 10), IsOk());
+
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  auto length_result = writer->Length();
+  ASSERT_THAT(length_result, IsOk());
+  EXPECT_GT(length_result.value(), 0);
+}
+
+TEST_F(PositionDeleteWriterTest, MetadataAfterClose) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 5), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsOk());
+
+  const auto& write_result = metadata_result.value();
+  ASSERT_EQ(write_result.data_files.size(), 1);
+
+  const auto& data_file = write_result.data_files[0];
+  EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
+  EXPECT_EQ(data_file->file_path, "test_deletes.parquet");
+  EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
+  EXPECT_GT(data_file->file_size_in_bytes, 0);
+  EXPECT_FALSE(data_file->sort_order_id.has_value());
+}
+
+TEST_F(PositionDeleteWriterTest, MetadataBeforeCloseReturnsError) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(metadata_result,
+              HasErrorMessage("Cannot get metadata before closing the 
writer"));
+}
+
+TEST_F(PositionDeleteWriterTest, CloseIsIdempotent) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  ASSERT_THAT(writer->WriteDelete("data_file.parquet", 0), IsOk());
+
+  ASSERT_THAT(writer->Close(), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+}
+
+TEST_F(PositionDeleteWriterTest, WriteMultipleDeletes) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  for (int64_t i = 0; i < 100; ++i) {
+    ASSERT_THAT(writer->WriteDelete("data_file.parquet", i), IsOk());
+  }
+
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsOk());
+
+  const auto& data_file = metadata_result.value().data_files[0];
+  EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
+  EXPECT_GT(data_file->file_size_in_bytes, 0);
+}
+
+TEST_F(PositionDeleteWriterTest, WriteBatchData) {
+  auto writer_result = PositionDeleteWriter::Make(MakeDeleteOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  auto test_data = CreatePositionDeleteData();
+  ArrowArray arrow_array;
+  ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
+  ASSERT_THAT(writer->Write(&arrow_array), IsOk());
+
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsOk());
+
+  const auto& data_file = metadata_result.value().data_files[0];
+  EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
+  EXPECT_GT(data_file->file_size_in_bytes, 0);
+}
+
+TEST_F(PositionDeleteWriterTest, AutoFlushOnThreshold) {
+  // Use a small flush threshold to trigger automatic flush
+  const int64_t flush_threshold = 5;
+  auto writer_result = 
PositionDeleteWriter::Make(MakeDeleteOptions(flush_threshold));
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  // Write more deletes than the threshold to trigger auto-flush
+  for (int64_t i = 0; i < 12; ++i) {
+    ASSERT_THAT(writer->WriteDelete("data_file.parquet", i), IsOk());
+  }
+
+  // Length should be > 0 since auto-flush should have written data
+  auto length_result = writer->Length();
+  ASSERT_THAT(length_result, IsOk());
+  EXPECT_GT(length_result.value(), 0);
+
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsOk());
+  const auto& data_file = metadata_result.value().data_files[0];
+  EXPECT_EQ(data_file->content, DataFile::Content::kPositionDeletes);
+  EXPECT_GT(data_file->file_size_in_bytes, 0);
+}
+
 }  // namespace iceberg

Reply via email to