This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 63e4ec07 feat: implement DataWriter for Iceberg data files (#552)
63e4ec07 is described below

commit 63e4ec079c7af4cf18bfd55a22c52d521a3219f0
Author: Xinli Shang <[email protected]>
AuthorDate: Wed Feb 25 01:23:22 2026 -0800

    feat: implement DataWriter for Iceberg data files (#552)
    
    Implements DataWriter class for writing Iceberg data files as part of
    issue #441 (task 2).
    
    Implementation:
    - Factory method DataWriter::Make() for creating writer instances
    - Support for Parquet and Avro file formats via WriterFactoryRegistry
    - Complete DataFile metadata generation including partition info, column
    statistics, serialized bounds, and sort order ID
    - Proper lifecycle management with Initialize/Write/Close/Metadata
    - PIMPL idiom for ABI stability
---
 src/iceberg/data/data_writer.cc      | 106 ++++++++++++++-
 src/iceberg/data/data_writer.h       |   5 +
 src/iceberg/test/data_writer_test.cc | 246 ++++++++++++++++++++++++++++++++++-
 3 files changed, 352 insertions(+), 5 deletions(-)

diff --git a/src/iceberg/data/data_writer.cc b/src/iceberg/data/data_writer.cc
index 0998e9ef..b00465bb 100644
--- a/src/iceberg/data/data_writer.cc
+++ b/src/iceberg/data/data_writer.cc
@@ -19,20 +19,118 @@
 
 #include "iceberg/data/data_writer.h"
 
+#include <map>
+
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
 namespace iceberg {
 
 class DataWriter::Impl {
  public:
+  static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+    WriterOptions writer_options{
+        .path = options.path,
+        .schema = options.schema,
+        .io = options.io,
+        .properties = WriterProperties::FromMap(options.properties),
+    };
+
+    ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                            WriterFactoryRegistry::Open(options.format, 
writer_options));
+
+    return std::unique_ptr<Impl>(new Impl(std::move(options), 
std::move(writer)));
+  }
+
+  Status Write(ArrowArray* data) {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    return writer_->Write(data);
+  }
+
+  Result<int64_t> Length() const {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    return writer_->length();
+  }
+
+  Status Close() {
+    ICEBERG_DCHECK(writer_, "Writer not initialized");
+    if (closed_) {
+      // Idempotent: no-op if already closed
+      return {};
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+    closed_ = true;
+    return {};
+  }
+
+  Result<FileWriter::WriteResult> Metadata() {
+    ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
+
+    ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+    ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+    auto split_offsets = writer_->split_offsets();
+
+    // Serialize literal bounds to binary format
+    std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
+    for (const auto& [col_id, literal] : metrics.lower_bounds) {
+      ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+      lower_bounds_map[col_id] = std::move(serialized);
+    }
+    std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
+    for (const auto& [col_id, literal] : metrics.upper_bounds) {
+      ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+      upper_bounds_map[col_id] = std::move(serialized);
+    }
+
+    auto data_file = std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kData,
+        .file_path = options_.path,
+        .file_format = options_.format,
+        .partition = options_.partition,
+        .record_count = metrics.row_count.value_or(-1),
+        .file_size_in_bytes = length,
+        .column_sizes = {metrics.column_sizes.begin(), 
metrics.column_sizes.end()},
+        .value_counts = {metrics.value_counts.begin(), 
metrics.value_counts.end()},
+        .null_value_counts = {metrics.null_value_counts.begin(),
+                              metrics.null_value_counts.end()},
+        .nan_value_counts = {metrics.nan_value_counts.begin(),
+                             metrics.nan_value_counts.end()},
+        .lower_bounds = std::move(lower_bounds_map),
+        .upper_bounds = std::move(upper_bounds_map),
+        .split_offsets = std::move(split_offsets),
+        .sort_order_id = options_.sort_order_id,
+    });
+
+    FileWriter::WriteResult result;
+    result.data_files.push_back(std::move(data_file));
+    return result;
+  }
+
+ private:
+  Impl(DataWriterOptions options, std::unique_ptr<Writer> writer)
+      : options_(std::move(options)), writer_(std::move(writer)) {}
+
+  DataWriterOptions options_;
+  std::unique_ptr<Writer> writer_;
+  bool closed_ = false;
 };
 
+DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
+
 DataWriter::~DataWriter() = default;
 
-Status DataWriter::Write(ArrowArray* data) { return NotImplemented(""); }
+Result<std::unique_ptr<DataWriter>> DataWriter::Make(const DataWriterOptions& 
options) {
+  ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
+  return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl)));
+}
+
+Status DataWriter::Write(ArrowArray* data) { return impl_->Write(data); }
 
-Result<int64_t> DataWriter::Length() const { return NotImplemented(""); }
+Result<int64_t> DataWriter::Length() const { return impl_->Length(); }
 
-Status DataWriter::Close() { return NotImplemented(""); }
+Status DataWriter::Close() { return impl_->Close(); }
 
-Result<FileWriter::WriteResult> DataWriter::Metadata() { return 
NotImplemented(""); }
+Result<FileWriter::WriteResult> DataWriter::Metadata() { return 
impl_->Metadata(); }
 
 }  // namespace iceberg
diff --git a/src/iceberg/data/data_writer.h b/src/iceberg/data/data_writer.h
index 08ac5f70..380c97e2 100644
--- a/src/iceberg/data/data_writer.h
+++ b/src/iceberg/data/data_writer.h
@@ -55,6 +55,9 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
  public:
   ~DataWriter() override;
 
+  /// \brief Create a new DataWriter instance.
+  static Result<std::unique_ptr<DataWriter>> Make(const DataWriterOptions& 
options);
+
   Status Write(ArrowArray* data) override;
   Result<int64_t> Length() const override;
   Status Close() override;
@@ -63,6 +66,8 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
  private:
   class Impl;
   std::unique_ptr<Impl> impl_;
+
+  explicit DataWriter(std::unique_ptr<Impl> impl);
 };
 
 }  // namespace iceberg
diff --git a/src/iceberg/test/data_writer_test.cc 
b/src/iceberg/test/data_writer_test.cc
index 9379becb..7671e7fe 100644
--- a/src/iceberg/test/data_writer_test.cc
+++ b/src/iceberg/test/data_writer_test.cc
@@ -17,7 +17,251 @@
  * under the License.
  */
 
+#include "iceberg/data/data_writer.h"
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-namespace iceberg {}  // namespace iceberg
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+using ::testing::HasSubstr;
+
+class DataWriterTest : public ::testing::Test {
+ protected:
+  static void SetUpTestSuite() {
+    parquet::RegisterAll();
+    avro::RegisterAll();
+  }
+
+  void SetUp() override {
+    file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+    schema_ = std::make_shared<Schema>(
+        std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                                 SchemaField::MakeOptional(2, "name", 
string())});
+    partition_spec_ = PartitionSpec::Unpartitioned();
+  }
+
+  DataWriterOptions MakeDefaultOptions(
+      std::optional<int32_t> sort_order_id = std::nullopt,
+      PartitionValues partition = PartitionValues{}) {
+    return DataWriterOptions{
+        .path = "test_data.parquet",
+        .schema = schema_,
+        .spec = partition_spec_,
+        .partition = std::move(partition),
+        .format = FileFormatType::kParquet,
+        .io = file_io_,
+        .sort_order_id = sort_order_id,
+        .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+    };
+  }
+
+  std::shared_ptr<::arrow::Array> CreateTestData() {
+    ArrowSchema arrow_c_schema;
+    ICEBERG_THROW_NOT_OK(ToArrowSchema(*schema_, &arrow_c_schema));
+    auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+    return ::arrow::json::ArrayFromJSONString(
+               ::arrow::struct_(arrow_schema->fields()),
+               R"([[1, "Alice"], [2, "Bob"], [3, "Charlie"]])")
+        .ValueOrDie();
+  }
+
+  void WriteTestDataToWriter(DataWriter* writer) {
+    auto test_data = CreateTestData();
+    ArrowArray arrow_array;
+    ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
+    ASSERT_THAT(writer->Write(&arrow_array), IsOk());
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<PartitionSpec> partition_spec_;
+};
+
+class DataWriterFormatTest
+    : public DataWriterTest,
+      public ::testing::WithParamInterface<std::pair<FileFormatType, 
std::string>> {};
+
+TEST_P(DataWriterFormatTest, CreateWithFormat) {
+  auto [format, path] = GetParam();
+  DataWriterOptions options{
+      .path = path,
+      .schema = schema_,
+      .spec = partition_spec_,
+      .partition = PartitionValues{},
+      .format = format,
+      .io = file_io_,
+      .properties =
+          format == FileFormatType::kParquet
+              ? std::unordered_map<std::string,
+                                   
std::string>{{"write.parquet.compression-codec",
+                                                 "uncompressed"}}
+              : std::unordered_map<std::string, std::string>{},
+  };
+
+  auto writer_result = DataWriter::Make(options);
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+  ASSERT_NE(writer, nullptr);
+}
+
+INSTANTIATE_TEST_SUITE_P(
+    FormatTypes, DataWriterFormatTest,
+    ::testing::Values(std::make_pair(FileFormatType::kParquet, 
"test_data.parquet"),
+                      std::make_pair(FileFormatType::kAvro, 
"test_data.avro")));
+
+TEST_F(DataWriterTest, WriteAndClose) {
+  auto writer_result = DataWriter::Make(MakeDefaultOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  // Write data
+  WriteTestDataToWriter(writer.get());
+
+  // Length should be greater than 0 after write
+  auto length_result = writer->Length();
+  ASSERT_THAT(length_result, IsOk());
+  EXPECT_GT(length_result.value(), 0);
+
+  // Close
+  ASSERT_THAT(writer->Close(), IsOk());
+}
+
+TEST_F(DataWriterTest, MetadataAfterClose) {
+  auto writer_result = DataWriter::Make(MakeDefaultOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  WriteTestDataToWriter(writer.get());
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  // Get metadata
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsOk());
+
+  const auto& write_result = metadata_result.value();
+  ASSERT_EQ(write_result.data_files.size(), 1);
+
+  const auto& data_file = write_result.data_files[0];
+  EXPECT_EQ(data_file->content, DataFile::Content::kData);
+  EXPECT_EQ(data_file->file_path, "test_data.parquet");
+  EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
+  EXPECT_GT(data_file->file_size_in_bytes, 0);
+
+  // Metrics availability depends on the underlying writer implementation
+  EXPECT_GE(data_file->column_sizes.size(), 0);
+  EXPECT_GE(data_file->value_counts.size(), 0);
+  EXPECT_GE(data_file->null_value_counts.size(), 0);
+}
+
+TEST_F(DataWriterTest, MetadataBeforeCloseReturnsError) {
+  auto writer_result = DataWriter::Make(MakeDefaultOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  // Try to get metadata before closing
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(metadata_result,
+              HasErrorMessage("Cannot get metadata before closing the 
writer"));
+}
+
+TEST_F(DataWriterTest, CloseIsIdempotent) {
+  auto writer_result = DataWriter::Make(MakeDefaultOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  WriteTestDataToWriter(writer.get());
+
+  ASSERT_THAT(writer->Close(), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+  ASSERT_THAT(writer->Close(), IsOk());
+}
+
+TEST_F(DataWriterTest, SortOrderIdInMetadata) {
+  // Test with explicit sort order id
+  {
+    const int32_t sort_order_id = 42;
+    auto writer_result = DataWriter::Make(MakeDefaultOptions(sort_order_id));
+    ASSERT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    WriteTestDataToWriter(writer.get());
+    ASSERT_THAT(writer->Close(), IsOk());
+
+    auto metadata_result = writer->Metadata();
+    ASSERT_THAT(metadata_result, IsOk());
+    const auto& data_file = metadata_result.value().data_files[0];
+    ASSERT_TRUE(data_file->sort_order_id.has_value());
+    EXPECT_EQ(data_file->sort_order_id.value(), sort_order_id);
+  }
+
+  // Test without sort order id (should be nullopt)
+  {
+    auto writer_result = DataWriter::Make(MakeDefaultOptions());
+    ASSERT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    WriteTestDataToWriter(writer.get());
+    ASSERT_THAT(writer->Close(), IsOk());
+
+    auto metadata_result = writer->Metadata();
+    ASSERT_THAT(metadata_result, IsOk());
+    const auto& data_file = metadata_result.value().data_files[0];
+    EXPECT_FALSE(data_file->sort_order_id.has_value());
+  }
+}
+
+TEST_F(DataWriterTest, PartitionValuesPreserved) {
+  PartitionValues partition_values({Literal::Int(42), 
Literal::String("test")});
+
+  auto writer_result =
+      DataWriter::Make(MakeDefaultOptions(std::nullopt, partition_values));
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  WriteTestDataToWriter(writer.get());
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsOk());
+  const auto& data_file = metadata_result.value().data_files[0];
+
+  EXPECT_EQ(data_file->partition.num_fields(), partition_values.num_fields());
+  EXPECT_EQ(data_file->partition.num_fields(), 2);
+}
+
+TEST_F(DataWriterTest, WriteMultipleBatches) {
+  auto writer_result = DataWriter::Make(MakeDefaultOptions());
+  ASSERT_THAT(writer_result, IsOk());
+  auto writer = std::move(writer_result.value());
+
+  WriteTestDataToWriter(writer.get());
+  WriteTestDataToWriter(writer.get());
+  ASSERT_THAT(writer->Close(), IsOk());
+
+  auto metadata_result = writer->Metadata();
+  ASSERT_THAT(metadata_result, IsOk());
+  const auto& data_file = metadata_result.value().data_files[0];
+  EXPECT_GT(data_file->file_size_in_bytes, 0);
+}
+
+}  // namespace iceberg

Reply via email to