This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 63e4ec07 feat: implement DataWriter for Iceberg data files (#552)
63e4ec07 is described below
commit 63e4ec079c7af4cf18bfd55a22c52d521a3219f0
Author: Xinli Shang <[email protected]>
AuthorDate: Wed Feb 25 01:23:22 2026 -0800
feat: implement DataWriter for Iceberg data files (#552)
Implements DataWriter class for writing Iceberg data files as part of
issue #441 (task 2).
Implementation:
- Factory method DataWriter::Make() for creating writer instances
- Support for Parquet and Avro file formats via WriterFactoryRegistry
- Complete DataFile metadata generation including partition info, column
statistics, serialized bounds, and sort order ID
- Proper lifecycle management with Initialize/Write/Close/Metadata
- PIMPL idiom for ABI stability
---
src/iceberg/data/data_writer.cc | 106 ++++++++++++++-
src/iceberg/data/data_writer.h | 5 +
src/iceberg/test/data_writer_test.cc | 246 ++++++++++++++++++++++++++++++++++-
3 files changed, 352 insertions(+), 5 deletions(-)
diff --git a/src/iceberg/data/data_writer.cc b/src/iceberg/data/data_writer.cc
index 0998e9ef..b00465bb 100644
--- a/src/iceberg/data/data_writer.cc
+++ b/src/iceberg/data/data_writer.cc
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include <map>
+
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options{
+ .path = options.path,
+ .schema = options.schema,
+ .io = options.io,
+ .properties = WriterProperties::FromMap(options.properties),
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_DCHECK(writer_, "Writer not initialized");
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ ICEBERG_DCHECK(writer_, "Writer not initialized");
+ return writer_->length();
+ }
+
+ Status Close() {
+ ICEBERG_DCHECK(writer_, "Writer not initialized");
+ if (closed_) {
+ // Idempotent: no-op if already closed
+ return {};
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_CHECK(closed_, "Cannot get metadata before closing the writer");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+ ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+ auto split_offsets = writer_->split_offsets();
+
+ // Serialize literal bounds to binary format
+ std::map<int32_t, std::vector<uint8_t>> lower_bounds_map;
+ for (const auto& [col_id, literal] : metrics.lower_bounds) {
+ ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+ lower_bounds_map[col_id] = std::move(serialized);
+ }
+ std::map<int32_t, std::vector<uint8_t>> upper_bounds_map;
+ for (const auto& [col_id, literal] : metrics.upper_bounds) {
+ ICEBERG_ASSIGN_OR_RAISE(auto serialized, literal.Serialize());
+ upper_bounds_map[col_id] = std::move(serialized);
+ }
+
+ auto data_file = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kData,
+ .file_path = options_.path,
+ .file_format = options_.format,
+ .partition = options_.partition,
+ .record_count = metrics.row_count.value_or(-1),
+ .file_size_in_bytes = length,
+ .column_sizes = {metrics.column_sizes.begin(),
metrics.column_sizes.end()},
+ .value_counts = {metrics.value_counts.begin(),
metrics.value_counts.end()},
+ .null_value_counts = {metrics.null_value_counts.begin(),
+ metrics.null_value_counts.end()},
+ .nan_value_counts = {metrics.nan_value_counts.begin(),
+ metrics.nan_value_counts.end()},
+ .lower_bounds = std::move(lower_bounds_map),
+ .upper_bounds = std::move(upper_bounds_map),
+ .split_offsets = std::move(split_offsets),
+ .sort_order_id = options_.sort_order_id,
+ });
+
+ FileWriter::WriteResult result;
+ result.data_files.push_back(std::move(data_file));
+ return result;
+ }
+
+ private:
+ Impl(DataWriterOptions options, std::unique_ptr<Writer> writer)
+ : options_(std::move(options)), writer_(std::move(writer)) {}
+
+ DataWriterOptions options_;
+ std::unique_ptr<Writer> writer_;
+ bool closed_ = false;
};
+DataWriter::DataWriter(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
+
DataWriter::~DataWriter() = default;
-Status DataWriter::Write(ArrowArray* data) { return NotImplemented(""); }
+Result<std::unique_ptr<DataWriter>> DataWriter::Make(const DataWriterOptions&
options) {
+ ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(options));
+ return std::unique_ptr<DataWriter>(new DataWriter(std::move(impl)));
+}
+
+Status DataWriter::Write(ArrowArray* data) { return impl_->Write(data); }
-Result<int64_t> DataWriter::Length() const { return NotImplemented(""); }
+Result<int64_t> DataWriter::Length() const { return impl_->Length(); }
-Status DataWriter::Close() { return NotImplemented(""); }
+Status DataWriter::Close() { return impl_->Close(); }
-Result<FileWriter::WriteResult> DataWriter::Metadata() { return
NotImplemented(""); }
+Result<FileWriter::WriteResult> DataWriter::Metadata() { return
impl_->Metadata(); }
} // namespace iceberg
diff --git a/src/iceberg/data/data_writer.h b/src/iceberg/data/data_writer.h
index 08ac5f70..380c97e2 100644
--- a/src/iceberg/data/data_writer.h
+++ b/src/iceberg/data/data_writer.h
@@ -55,6 +55,9 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
public:
~DataWriter() override;
+ /// \brief Create a new DataWriter instance.
+ static Result<std::unique_ptr<DataWriter>> Make(const DataWriterOptions&
options);
+
Status Write(ArrowArray* data) override;
Result<int64_t> Length() const override;
Status Close() override;
@@ -63,6 +66,8 @@ class ICEBERG_EXPORT DataWriter : public FileWriter {
private:
class Impl;
std::unique_ptr<Impl> impl_;
+
+ explicit DataWriter(std::unique_ptr<Impl> impl);
};
} // namespace iceberg
diff --git a/src/iceberg/test/data_writer_test.cc
b/src/iceberg/test/data_writer_test.cc
index 9379becb..7671e7fe 100644
--- a/src/iceberg/test/data_writer_test.cc
+++ b/src/iceberg/test/data_writer_test.cc
@@ -17,7 +17,251 @@
* under the License.
*/
+#include "iceberg/data/data_writer.h"
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-namespace iceberg {} // namespace iceberg
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+using ::testing::HasSubstr;
+
+class DataWriterTest : public ::testing::Test {
+ protected:
+ static void SetUpTestSuite() {
+ parquet::RegisterAll();
+ avro::RegisterAll();
+ }
+
+ void SetUp() override {
+ file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+ schema_ = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "name",
string())});
+ partition_spec_ = PartitionSpec::Unpartitioned();
+ }
+
+ DataWriterOptions MakeDefaultOptions(
+ std::optional<int32_t> sort_order_id = std::nullopt,
+ PartitionValues partition = PartitionValues{}) {
+ return DataWriterOptions{
+ .path = "test_data.parquet",
+ .schema = schema_,
+ .spec = partition_spec_,
+ .partition = std::move(partition),
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .sort_order_id = sort_order_id,
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+ }
+
+ std::shared_ptr<::arrow::Array> CreateTestData() {
+ ArrowSchema arrow_c_schema;
+ ICEBERG_THROW_NOT_OK(ToArrowSchema(*schema_, &arrow_c_schema));
+ auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie();
+
+ return ::arrow::json::ArrayFromJSONString(
+ ::arrow::struct_(arrow_schema->fields()),
+ R"([[1, "Alice"], [2, "Bob"], [3, "Charlie"]])")
+ .ValueOrDie();
+ }
+
+ void WriteTestDataToWriter(DataWriter* writer) {
+ auto test_data = CreateTestData();
+ ArrowArray arrow_array;
+ ASSERT_TRUE(::arrow::ExportArray(*test_data, &arrow_array).ok());
+ ASSERT_THAT(writer->Write(&arrow_array), IsOk());
+ }
+
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<PartitionSpec> partition_spec_;
+};
+
+class DataWriterFormatTest
+ : public DataWriterTest,
+ public ::testing::WithParamInterface<std::pair<FileFormatType,
std::string>> {};
+
+TEST_P(DataWriterFormatTest, CreateWithFormat) {
+ auto [format, path] = GetParam();
+ DataWriterOptions options{
+ .path = path,
+ .schema = schema_,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = format,
+ .io = file_io_,
+ .properties =
+ format == FileFormatType::kParquet
+ ? std::unordered_map<std::string,
+
std::string>{{"write.parquet.compression-codec",
+ "uncompressed"}}
+ : std::unordered_map<std::string, std::string>{},
+ };
+
+ auto writer_result = DataWriter::Make(options);
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+ ASSERT_NE(writer, nullptr);
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ FormatTypes, DataWriterFormatTest,
+ ::testing::Values(std::make_pair(FileFormatType::kParquet,
"test_data.parquet"),
+ std::make_pair(FileFormatType::kAvro,
"test_data.avro")));
+
+TEST_F(DataWriterTest, WriteAndClose) {
+ auto writer_result = DataWriter::Make(MakeDefaultOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ // Write data
+ WriteTestDataToWriter(writer.get());
+
+ // Length should be greater than 0 after write
+ auto length_result = writer->Length();
+ ASSERT_THAT(length_result, IsOk());
+ EXPECT_GT(length_result.value(), 0);
+
+ // Close
+ ASSERT_THAT(writer->Close(), IsOk());
+}
+
+TEST_F(DataWriterTest, MetadataAfterClose) {
+ auto writer_result = DataWriter::Make(MakeDefaultOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ WriteTestDataToWriter(writer.get());
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ // Get metadata
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+
+ const auto& write_result = metadata_result.value();
+ ASSERT_EQ(write_result.data_files.size(), 1);
+
+ const auto& data_file = write_result.data_files[0];
+ EXPECT_EQ(data_file->content, DataFile::Content::kData);
+ EXPECT_EQ(data_file->file_path, "test_data.parquet");
+ EXPECT_EQ(data_file->file_format, FileFormatType::kParquet);
+ EXPECT_GT(data_file->file_size_in_bytes, 0);
+
+ // Metrics availability depends on the underlying writer implementation
+ EXPECT_GE(data_file->column_sizes.size(), 0);
+ EXPECT_GE(data_file->value_counts.size(), 0);
+ EXPECT_GE(data_file->null_value_counts.size(), 0);
+}
+
+TEST_F(DataWriterTest, MetadataBeforeCloseReturnsError) {
+ auto writer_result = DataWriter::Make(MakeDefaultOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ // Try to get metadata before closing
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(metadata_result,
+ HasErrorMessage("Cannot get metadata before closing the
writer"));
+}
+
+TEST_F(DataWriterTest, CloseIsIdempotent) {
+ auto writer_result = DataWriter::Make(MakeDefaultOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ WriteTestDataToWriter(writer.get());
+
+ ASSERT_THAT(writer->Close(), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+}
+
+TEST_F(DataWriterTest, SortOrderIdInMetadata) {
+ // Test with explicit sort order id
+ {
+ const int32_t sort_order_id = 42;
+ auto writer_result = DataWriter::Make(MakeDefaultOptions(sort_order_id));
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ WriteTestDataToWriter(writer.get());
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+ const auto& data_file = metadata_result.value().data_files[0];
+ ASSERT_TRUE(data_file->sort_order_id.has_value());
+ EXPECT_EQ(data_file->sort_order_id.value(), sort_order_id);
+ }
+
+ // Test without sort order id (should be nullopt)
+ {
+ auto writer_result = DataWriter::Make(MakeDefaultOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ WriteTestDataToWriter(writer.get());
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+ const auto& data_file = metadata_result.value().data_files[0];
+ EXPECT_FALSE(data_file->sort_order_id.has_value());
+ }
+}
+
+TEST_F(DataWriterTest, PartitionValuesPreserved) {
+ PartitionValues partition_values({Literal::Int(42),
Literal::String("test")});
+
+ auto writer_result =
+ DataWriter::Make(MakeDefaultOptions(std::nullopt, partition_values));
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ WriteTestDataToWriter(writer.get());
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+ const auto& data_file = metadata_result.value().data_files[0];
+
+ EXPECT_EQ(data_file->partition.num_fields(), partition_values.num_fields());
+ EXPECT_EQ(data_file->partition.num_fields(), 2);
+}
+
+TEST_F(DataWriterTest, WriteMultipleBatches) {
+ auto writer_result = DataWriter::Make(MakeDefaultOptions());
+ ASSERT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ WriteTestDataToWriter(writer.get());
+ WriteTestDataToWriter(writer.get());
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ auto metadata_result = writer->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+ const auto& data_file = metadata_result.value().data_files[0];
+ EXPECT_GT(data_file->file_size_in_bytes, 0);
+}
+
+} // namespace iceberg