shangxinli commented on code in PR #552:
URL: https://github.com/apache/iceberg-cpp/pull/552#discussion_r2808241947
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
Review Comment:
Done. Changed to use aggregate initialization for `WriterOptions`.
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
+ writer_options.path = options.path;
+ writer_options.schema = options.schema;
+ writer_options.io = options.io;
+ writer_options.properties = WriterProperties::FromMap(options.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
Review Comment:
Good point. Since `writer_` is always initialized in `Make()` (the
constructor is private and only called after successful writer creation), the
check can never fail. Changed to `ICEBERG_DCHECK` for all three usages.
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,124 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ explicit Impl(DataWriterOptions options) : options_(std::move(options)) {}
+
+ Status Initialize() {
+ WriterOptions writer_options;
+ writer_options.path = options_.path;
+ writer_options.schema = options_.schema;
+ writer_options.io = options_.io;
+ writer_options.properties = WriterProperties::FromMap(options_.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(writer_,
+ WriterFactoryRegistry::Open(options_.format,
writer_options));
+ return {};
+ }
+
+ Status Write(ArrowArray* data) {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ return writer_->length();
+ }
+
+ Status Close() {
+ if (!writer_) {
+ return InvalidArgument("Writer not initialized");
+ }
+ if (closed_) {
+ return InvalidArgument("Writer already closed");
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
Review Comment:
Agreed. Removed the thread safety comment from the header.
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
+ writer_options.path = options.path;
+ writer_options.schema = options.schema;
+ writer_options.io = options.io;
+ writer_options.properties = WriterProperties::FromMap(options.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->length();
+ }
+
+ Status Close() {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ if (closed_) {
+ // Idempotent: no-op if already closed
+ return {};
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
Review Comment:
Done. Changed to `ICEBERG_CHECK` which returns `ValidationFailed` instead of
`InvalidArgument`. Updated the test expectation accordingly.
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
+ writer_options.path = options.path;
+ writer_options.schema = options.schema;
+ writer_options.io = options.io;
+ writer_options.properties = WriterProperties::FromMap(options.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->length();
+ }
+
+ Status Close() {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ if (closed_) {
+ // Idempotent: no-op if already closed
+ return {};
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+ ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+ auto split_offsets = writer_->split_offsets();
+
+ auto data_file = std::make_shared<DataFile>();
+ data_file->content = DataFile::Content::kData;
+ data_file->file_path = options_.path;
+ data_file->file_format = options_.format;
+ data_file->partition = options_.partition;
+ data_file->record_count = metrics.row_count.value_or(0);
Review Comment:
Done. Changed to `value_or(-1)` to match the Java implementation.
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
+ writer_options.path = options.path;
+ writer_options.schema = options.schema;
+ writer_options.io = options.io;
+ writer_options.properties = WriterProperties::FromMap(options.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->length();
+ }
+
+ Status Close() {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ if (closed_) {
+ // Idempotent: no-op if already closed
+ return {};
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+ ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+ auto split_offsets = writer_->split_offsets();
+
+ auto data_file = std::make_shared<DataFile>();
+ data_file->content = DataFile::Content::kData;
Review Comment:
Done. Changed to use aggregate initialization for `DataFile`. Also used
range constructors for the metrics maps (e.g. `{metrics.column_sizes.begin(),
metrics.column_sizes.end()}`) to simplify the conversion. The bounds maps still
need explicit loops due to the serialization step.
##########
src/iceberg/data/data_writer.cc:
##########
@@ -19,20 +19,118 @@
#include "iceberg/data/data_writer.h"
+#include "iceberg/file_writer.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/util/macros.h"
+
namespace iceberg {
class DataWriter::Impl {
public:
+ static Result<std::unique_ptr<Impl>> Make(DataWriterOptions options) {
+ WriterOptions writer_options;
+ writer_options.path = options.path;
+ writer_options.schema = options.schema;
+ writer_options.io = options.io;
+ writer_options.properties = WriterProperties::FromMap(options.properties);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ WriterFactoryRegistry::Open(options.format,
writer_options));
+
+ return std::unique_ptr<Impl>(new Impl(std::move(options),
std::move(writer)));
+ }
+
+ Status Write(ArrowArray* data) {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->Write(data);
+ }
+
+ Result<int64_t> Length() const {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ return writer_->length();
+ }
+
+ Status Close() {
+ ICEBERG_PRECHECK(writer_, "Writer not initialized");
+ if (closed_) {
+ // Idempotent: no-op if already closed
+ return {};
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer_->Close());
+ closed_ = true;
+ return {};
+ }
+
+ Result<FileWriter::WriteResult> Metadata() {
+ ICEBERG_PRECHECK(closed_, "Cannot get metadata before closing the writer");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto metrics, writer_->metrics());
+ ICEBERG_ASSIGN_OR_RAISE(auto length, writer_->length());
+ auto split_offsets = writer_->split_offsets();
+
+ auto data_file = std::make_shared<DataFile>();
+ data_file->content = DataFile::Content::kData;
+ data_file->file_path = options_.path;
+ data_file->file_format = options_.format;
+ data_file->partition = options_.partition;
+ data_file->record_count = metrics.row_count.value_or(0);
+ data_file->file_size_in_bytes = length;
+ data_file->sort_order_id = options_.sort_order_id;
+ data_file->split_offsets = std::move(split_offsets);
+
+ // Convert metrics maps from unordered_map to map
+ for (const auto& [col_id, size] : metrics.column_sizes) {
+ data_file->column_sizes[col_id] = size;
Review Comment:
That would be a nice cleanup. For now I've simplified the conversion by
using range constructors instead of explicit for-loops. Changing `Metrics` and
`DataFile` to use the same map type would be a good follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]