wgtmac commented on code in PR #443: URL: https://github.com/apache/iceberg-cpp/pull/443#discussion_r2650259628
########## src/iceberg/manifest/rolling_manifest_writer.h: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/rolling_manifest_writer.h +/// Rolling manifest writer that can produce multiple manifest files. + +#include <functional> +#include <memory> +#include <vector> + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief A rolling manifest writer that can produce multiple manifest files. +/// +/// As opposed to ManifestWriter, a rolling writer could produce multiple manifest +/// files. +class ICEBERG_EXPORT RollingManifestWriter { + public: + /// \brief Factory function type for creating ManifestWriter instances. + using ManifestWriterFactory = std::function<Result<std::unique_ptr<ManifestWriter>>()>; + + /// \brief Construct a rolling manifest writer. + /// \param manifest_writer_factory Factory function to create new ManifestWriter + /// instances. + /// \param target_file_size_in_bytes Target file size in bytes. When the current + /// file reaches this size (and row count is a multiple of 250), a new file + /// will be created. + RollingManifestWriter(ManifestWriterFactory manifest_writer_factory, + int64_t target_file_size_in_bytes); + + ~RollingManifestWriter(); + + /// \brief Add an added entry for a file. + /// + /// \param file a data file + /// \return Status::OK() if the entry was written successfully + /// \note The entry's snapshot ID will be this manifest's snapshot ID. The + /// entry's data sequence number will be the provided data sequence number. + /// The entry's file sequence number will be assigned at commit. + Status WriteAddedEntry(std::shared_ptr<DataFile> file, + std::optional<int64_t> data_sequence_number = std::nullopt); + + /// \brief Add an existing entry for a file. + /// + /// \param file an existing data file + /// \param file_snapshot_id snapshot ID when the data file was added to the table + /// \param data_sequence_number a data sequence number of the file (assigned when + /// the file was added) + /// \param file_sequence_number a file sequence number (assigned when the file + /// was added) + /// \return Status::OK() if the entry was written successfully + /// \note The original data and file sequence numbers, snapshot ID, which were + /// assigned at commit, must be preserved when adding an existing entry. + Status WriteExistingEntry(std::shared_ptr<DataFile> file, int64_t file_snapshot_id, + int64_t data_sequence_number, + std::optional<int64_t> file_sequence_number = std::nullopt); + + /// \brief Add a delete entry for a file. + /// + /// \param file a deleted data file + /// \param data_sequence_number a data sequence number of the file (assigned when + /// the file was added) + /// \param file_sequence_number a file sequence number (assigned when the file + /// was added) + /// \return Status::OK() if the entry was written successfully + /// \note The entry's snapshot ID will be this manifest's snapshot ID. However, + /// the original data and file sequence numbers of the file must be preserved + /// when the file is marked as deleted. + Status WriteDeletedEntry(std::shared_ptr<DataFile> file, int64_t data_sequence_number, + std::optional<int64_t> file_sequence_number = std::nullopt); + + /// \brief Close the rolling manifest writer. + Status Close(); + + /// \brief Get the list of manifest files produced by this writer. + /// \return A vector of ManifestFile objects + /// \note Only valid after the writer is closed. + Result<std::vector<ManifestFile>> ToManifestFiles() const; + + private: + /// \brief Get or create the current writer, rolling to a new file if needed. + /// \return The current ManifestWriter, or an error if creation fails + Result<ManifestWriter*> CurrentWriter(); + + /// \brief Check if we should roll to a new file. + /// + /// This method checks if the current file has reached the target size + /// or the number of rows has reached the threshold. If so, it rolls to a new file. + bool ShouldRollToNewFile() const; + + /// \brief Close the current writer and add its ManifestFile to the list. + Status CloseCurrentWriter(); + + static constexpr int64_t kRowsDivisor = 250; Review Comment: Please add a comment to explain it. ########## src/iceberg/manifest/rolling_manifest_writer.cc: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/rolling_manifest_writer.h" + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +RollingManifestWriter::RollingManifestWriter( + ManifestWriterFactory manifest_writer_factory, int64_t target_file_size_in_bytes) + : manifest_writer_factory_(std::move(manifest_writer_factory)), + target_file_size_in_bytes_(target_file_size_in_bytes) {} + +RollingManifestWriter::~RollingManifestWriter() { + // Ensure we close the current writer if not already closed + if (!closed_) { + (void)CloseCurrentWriter(); Review Comment: BTW, why not directly calling `Close()` so you don't need to check `closed_`? ########## src/iceberg/manifest/rolling_manifest_writer.h: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/rolling_manifest_writer.h +/// Rolling manifest writer that can produce multiple manifest files. + +#include <functional> +#include <memory> +#include <vector> + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief A rolling manifest writer that can produce multiple manifest files. +/// +/// As opposed to ManifestWriter, a rolling writer could produce multiple manifest +/// files. Review Comment: ```suggestion ``` I think they are unnecessary. ########## src/iceberg/manifest/rolling_manifest_writer.cc: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/rolling_manifest_writer.h" + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +RollingManifestWriter::RollingManifestWriter( + ManifestWriterFactory manifest_writer_factory, int64_t target_file_size_in_bytes) + : manifest_writer_factory_(std::move(manifest_writer_factory)), + target_file_size_in_bytes_(target_file_size_in_bytes) {} + +RollingManifestWriter::~RollingManifestWriter() { + // Ensure we close the current writer if not already closed + if (!closed_) { + (void)CloseCurrentWriter(); + } +} + +Status RollingManifestWriter::WriteAddedEntry( + std::shared_ptr<DataFile> file, std::optional<int64_t> data_sequence_number) { + ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); + ICEBERG_RETURN_UNEXPECTED( + writer->WriteAddedEntry(std::move(file), data_sequence_number)); + current_file_rows_++; + return {}; +} + +Status RollingManifestWriter::WriteExistingEntry( + std::shared_ptr<DataFile> file, int64_t file_snapshot_id, + int64_t data_sequence_number, std::optional<int64_t> file_sequence_number) { + ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry( + std::move(file), file_snapshot_id, data_sequence_number, file_sequence_number)); + current_file_rows_++; + return {}; +} + +Status RollingManifestWriter::WriteDeletedEntry( + std::shared_ptr<DataFile> file, int64_t data_sequence_number, + std::optional<int64_t> file_sequence_number) { + ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); + ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry( + std::move(file), data_sequence_number, file_sequence_number)); + current_file_rows_++; + return {}; +} + +Status RollingManifestWriter::Close() { + if (!closed_) { + ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter()); + closed_ = true; + } + return {}; +} + +Result<std::vector<ManifestFile>> RollingManifestWriter::ToManifestFiles() const { + if (!closed_) { + return Invalid("Cannot get ManifestFile list from unclosed writer"); + } + return manifest_files_; +} + +Result<ManifestWriter*> RollingManifestWriter::CurrentWriter() { + if (current_writer_ == nullptr) { + ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_()); + } else if (ShouldRollToNewFile()) { + ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter()); + ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_()); + } + + return current_writer_.get(); +} + +bool RollingManifestWriter::ShouldRollToNewFile() const { + if (current_writer_ == nullptr) { + return false; + } + // Roll when row count is a multiple of ROWS_DIVISOR and file size >= target + if (current_file_rows_ % kRowsDivisor == 0) { + auto length_result = current_writer_->length(); + if (length_result.has_value()) { + return length_result.value() >= target_file_size_in_bytes_; + } + // If we can't get the length, don't roll Review Comment: Not sure if this is a good idea. What if it is a fatal error returned by the underlying writer? ########## src/iceberg/manifest/rolling_manifest_writer.cc: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/rolling_manifest_writer.h" + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +RollingManifestWriter::RollingManifestWriter( + ManifestWriterFactory manifest_writer_factory, int64_t target_file_size_in_bytes) + : manifest_writer_factory_(std::move(manifest_writer_factory)), + target_file_size_in_bytes_(target_file_size_in_bytes) {} + +RollingManifestWriter::~RollingManifestWriter() { + // Ensure we close the current writer if not already closed + if (!closed_) { + (void)CloseCurrentWriter(); Review Comment: ```suggestion std::ignore = CloseCurrentWriter(); ``` ########## src/iceberg/manifest/rolling_manifest_writer.h: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/manifest/rolling_manifest_writer.h +/// Rolling manifest writer that can produce multiple manifest files. + +#include <functional> +#include <memory> +#include <vector> + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/result.h" + +namespace iceberg { + +/// \brief A rolling manifest writer that can produce multiple manifest files. +/// +/// As opposed to ManifestWriter, a rolling writer could produce multiple manifest +/// files. +class ICEBERG_EXPORT RollingManifestWriter { + public: + /// \brief Factory function type for creating ManifestWriter instances. + using ManifestWriterFactory = std::function<Result<std::unique_ptr<ManifestWriter>>()>; + + /// \brief Construct a rolling manifest writer. + /// \param manifest_writer_factory Factory function to create new ManifestWriter + /// instances. + /// \param target_file_size_in_bytes Target file size in bytes. When the current + /// file reaches this size (and row count is a multiple of 250), a new file + /// will be created. + RollingManifestWriter(ManifestWriterFactory manifest_writer_factory, + int64_t target_file_size_in_bytes); + + ~RollingManifestWriter(); + + /// \brief Add an added entry for a file. + /// + /// \param file a data file + /// \return Status::OK() if the entry was written successfully Review Comment: Unrelated to this PR: I think we don't have `Status::OK()` method. We may need to fix all these comments to not confuse readers. ########## src/iceberg/manifest/rolling_manifest_writer.cc: ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/manifest/rolling_manifest_writer.h" + +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +RollingManifestWriter::RollingManifestWriter( + ManifestWriterFactory manifest_writer_factory, int64_t target_file_size_in_bytes) + : manifest_writer_factory_(std::move(manifest_writer_factory)), + target_file_size_in_bytes_(target_file_size_in_bytes) {} + +RollingManifestWriter::~RollingManifestWriter() { + // Ensure we close the current writer if not already closed + if (!closed_) { + (void)CloseCurrentWriter(); + } +} + +Status RollingManifestWriter::WriteAddedEntry( + std::shared_ptr<DataFile> file, std::optional<int64_t> data_sequence_number) { + ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); + ICEBERG_RETURN_UNEXPECTED( + writer->WriteAddedEntry(std::move(file), data_sequence_number)); + current_file_rows_++; + return {}; +} + +Status RollingManifestWriter::WriteExistingEntry( + std::shared_ptr<DataFile> file, int64_t file_snapshot_id, + int64_t data_sequence_number, std::optional<int64_t> file_sequence_number) { + ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); + ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry( + std::move(file), file_snapshot_id, data_sequence_number, file_sequence_number)); + current_file_rows_++; + return {}; +} + +Status RollingManifestWriter::WriteDeletedEntry( + std::shared_ptr<DataFile> file, int64_t data_sequence_number, + std::optional<int64_t> file_sequence_number) { + ICEBERG_ASSIGN_OR_RAISE(auto* writer, CurrentWriter()); + ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry( + std::move(file), data_sequence_number, file_sequence_number)); + current_file_rows_++; + return {}; +} + +Status RollingManifestWriter::Close() { + if (!closed_) { + ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter()); + closed_ = true; + } + return {}; +} + +Result<std::vector<ManifestFile>> RollingManifestWriter::ToManifestFiles() const { + if (!closed_) { + return Invalid("Cannot get ManifestFile list from unclosed writer"); + } + return manifest_files_; +} + +Result<ManifestWriter*> RollingManifestWriter::CurrentWriter() { + if (current_writer_ == nullptr) { + ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_()); + } else if (ShouldRollToNewFile()) { + ICEBERG_RETURN_UNEXPECTED(CloseCurrentWriter()); + ICEBERG_ASSIGN_OR_RAISE(current_writer_, manifest_writer_factory_()); + } + + return current_writer_.get(); +} + +bool RollingManifestWriter::ShouldRollToNewFile() const { + if (current_writer_ == nullptr) { + return false; + } + // Roll when row count is a multiple of ROWS_DIVISOR and file size >= target Review Comment: ```suggestion // Roll when row count is a multiple of the divisor and file size >= target ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
