Copilot commented on code in PR #81: URL: https://github.com/apache/paimon-cpp/pull/81#discussion_r3393764985
########## src/paimon/core/io/single_file_writer.h: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cassert> +#include <cstdint> +#include <functional> +#include <memory> +#include <string> +#include <utility> + +#include "arrow/c/abi.h" +#include "arrow/c/helpers.h" +#include "fmt/format.h" +#include "paimon/common/utils/arrow/arrow_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/file_writer.h" +#include "paimon/format/format_writer.h" +#include "paimon/format/writer_builder.h" +#include "paimon/fs/file_system.h" +#include "paimon/logging.h" +#include "paimon/macros.h" +#include "paimon/record_batch.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +class RecordBatch; +class Metrics; + +/// A `FileWriter` to produce a single file. +/// +/// <T> type of records to write. +/// <R> is the type of result to produce after writing a file. +template <typename T, typename R> +class SingleFileWriter : public FileWriter<T, R> { + public: + /// Abort executor to just have reference of path instead of whole writer. + class AbortExecutor { + public: + AbortExecutor(const std::shared_ptr<FileSystem>& fs, const std::string& path) + : fs_(fs), path_(path), logger_(Logger::GetLogger("AbortExecutor")) {} + + void Abort() { + if (fs_) { + auto status = fs_->Delete(path_); + if (!status.ok()) { + PAIMON_LOG_WARN(logger_, "Exception occurs when deleting %s: %s", path_.c_str(), + status.ToString().c_str()); + } + } + } + + private: + std::shared_ptr<FileSystem> fs_; + std::string path_; + std::shared_ptr<Logger> logger_; + }; + + SingleFileWriter(const std::string& compression, + std::function<Status(T, ::ArrowArray*)> converter) + : compression_(compression), + converter_(converter), + logger_(Logger::GetLogger("SingleFileWriter")) {} + + virtual Status Init(const std::shared_ptr<FileSystem>& fs, const std::string& path, + const std::shared_ptr<WriterBuilder>& writer_builder); + + Status Write(T record) override; + + int64_t RecordCount() const override { + return record_count_; + } + void Abort() override; + Status Close() override; + + std::shared_ptr<Metrics> GetMetrics() const override { + if (writer_) { + return writer_->GetWriterMetrics(); + } + return nullptr; + } + + Result<bool> ReachTargetSize(bool suggested_check, int64_t target_size); + + Result<AbortExecutor> GetAbortExecutor() const { + if (closed_ == false) { + return Status::Invalid("Writer should be closed!"); + } + return AbortExecutor(fs_, path_); + } + + std::string GetPath() const { + return path_; + } + + protected: + int64_t output_bytes_ = -1; + std::string compression_; + std::function<Status(T, ArrowArray*)> converter_; + std::shared_ptr<FileSystem> fs_; + std::shared_ptr<OutputStream> out_; // nullptr for DirectWriterBuilder + bool closed_ = false; + std::string path_; + + private: + int64_t record_count_ = 0; + std::unique_ptr<FormatWriter> writer_; + + std::unique_ptr<Logger> logger_; +}; + +template <typename T, typename R> +Status SingleFileWriter<T, R>::Init(const std::shared_ptr<FileSystem>& fs, const std::string& path, + const std::shared_ptr<WriterBuilder>& writer_builder) { + ScopeGuard guard([this]() -> void { + this->Abort(); + PAIMON_LOG_WARN(logger_, + "Exception occurs when initializing single file writer %s. Cleaning up.", + path_.c_str()); + }); + path_ = path; + fs_ = fs; Review Comment: `ScopeGuard` is created before `path_`/`fs_` are assigned. If an error occurs early in `Init`, `Abort()` may attempt to delete `path_` while it is still empty/uninitialized, and the log will also print an empty path. Assign `path_`/`fs_` before creating the guard, or make the guard conditional on `fs_`/`path_` being set. ########## src/paimon/core/io/single_file_writer.h: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cassert> +#include <cstdint> +#include <functional> +#include <memory> +#include <string> +#include <utility> + +#include "arrow/c/abi.h" +#include "arrow/c/helpers.h" +#include "fmt/format.h" +#include "paimon/common/utils/arrow/arrow_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/file_writer.h" +#include "paimon/format/format_writer.h" +#include "paimon/format/writer_builder.h" +#include "paimon/fs/file_system.h" +#include "paimon/logging.h" +#include "paimon/macros.h" +#include "paimon/record_batch.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +class RecordBatch; +class Metrics; + +/// A `FileWriter` to produce a single file. +/// +/// <T> type of records to write. +/// <R> is the type of result to produce after writing a file. +template <typename T, typename R> +class SingleFileWriter : public FileWriter<T, R> { + public: + /// Abort executor to just have reference of path instead of whole writer. + class AbortExecutor { + public: + AbortExecutor(const std::shared_ptr<FileSystem>& fs, const std::string& path) + : fs_(fs), path_(path), logger_(Logger::GetLogger("AbortExecutor")) {} + + void Abort() { + if (fs_) { + auto status = fs_->Delete(path_); + if (!status.ok()) { + PAIMON_LOG_WARN(logger_, "Exception occurs when deleting %s: %s", path_.c_str(), + status.ToString().c_str()); + } + } + } + + private: + std::shared_ptr<FileSystem> fs_; + std::string path_; + std::shared_ptr<Logger> logger_; + }; + + SingleFileWriter(const std::string& compression, + std::function<Status(T, ::ArrowArray*)> converter) + : compression_(compression), + converter_(converter), + logger_(Logger::GetLogger("SingleFileWriter")) {} + + virtual Status Init(const std::shared_ptr<FileSystem>& fs, const std::string& path, + const std::shared_ptr<WriterBuilder>& writer_builder); + + Status Write(T record) override; + + int64_t RecordCount() const override { + return record_count_; + } + void Abort() override; + Status Close() override; + + std::shared_ptr<Metrics> GetMetrics() const override { + if (writer_) { + return writer_->GetWriterMetrics(); + } + return nullptr; + } + + Result<bool> ReachTargetSize(bool suggested_check, int64_t target_size); + + Result<AbortExecutor> GetAbortExecutor() const { + if (closed_ == false) { + return Status::Invalid("Writer should be closed!"); + } + return AbortExecutor(fs_, path_); + } Review Comment: The error message includes an exclamation mark (`"Writer should be closed!"`) but the new test asserts `"Writer should be closed"` (without `!`). This will likely make the test fail if `ASSERT_NOK_WITH_MSG` compares the full message. Align either the message here or the expected string in the test. ########## src/paimon/core/io/rolling_blob_file_writer.cpp: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/io/rolling_blob_file_writer.h" + +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/array/array_base.h" +#include "arrow/array/array_nested.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "fmt/format.h" +#include "fmt/ranges.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/macros.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { + +RollingBlobFileWriter::RollingBlobFileWriter( + int64_t target_file_size, + std::function<Result<std::unique_ptr<MainWriter>>()> create_file_writer, + const std::shared_ptr<arrow::Schema>& blob_schema, + MultipleBlobFileWriter::BlobWriterCreator blob_writer_creator, + const std::shared_ptr<arrow::DataType>& data_type) + : RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>(target_file_size, + create_file_writer), + blob_schema_(blob_schema), + blob_writer_creator_(std::move(blob_writer_creator)), + data_type_(data_type), + logger_(Logger::GetLogger("RollingBlobFileWriter")) {} + +Status RollingBlobFileWriter::Write(::ArrowArray* record) { + ScopeGuard guard([this]() -> void { this->Abort(); }); + // Open the current writer if write the first record or roll over happen before. + if (PAIMON_UNLIKELY(current_writer_ == nullptr)) { + PAIMON_RETURN_NOT_OK(OpenCurrentWriter()); + } + if (PAIMON_UNLIKELY(blob_writer_ == nullptr)) { + blob_writer_ = std::make_unique<MultipleBlobFileWriter>(blob_schema_, blob_writer_creator_); + } + int64_t record_count = record->length; + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array, + arrow::ImportArray(record, data_type_)); + auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(arrow_array); + + PAIMON_ASSIGN_OR_RAISE(BlobUtils::SeparatedStructArrays separated_arrays, + BlobUtils::SeparateBlobArray(struct_array)); Review Comment: `struct_array` can be `nullptr` if the imported array isn’t a `StructArray`. Passing a null `struct_array` into `SeparateBlobArray` can lead to a crash or undefined behavior. Return a clear `Status::Invalid(...)` when the cast fails (and include the actual type where possible). ########## src/paimon/core/io/rolling_file_writer.h: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <utility> +#include <vector> + +#include "arrow/c/bridge.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/core/io/file_writer.h" +#include "paimon/core/io/single_file_writer.h" +#include "paimon/core/key_value.h" +#include "paimon/metrics.h" +#include "paimon/record_batch.h" + +namespace paimon { + +// Writer to roll over to a new file if the current size exceed the target file size. +template <typename T, typename R> +class RollingFileWriter : public FileWriter<T, std::vector<R>> { + public: + RollingFileWriter( + int64_t target_file_size, + std::function<Result<std::unique_ptr<SingleFileWriter<T, R>>>()> create_file_writer) + : target_file_size_(target_file_size), + create_file_writer(create_file_writer), + metrics_(std::make_shared<MetricsImpl>()), + logger_(Logger::GetLogger("RollingFileWriter")) {} Review Comment: The member `create_file_writer` is inconsistent with the surrounding naming convention (`target_file_size_`, `metrics_`, etc.). Rename it to `create_file_writer_` and initialize it via `std::move(create_file_writer)` to avoid an unnecessary `std::function` copy. ########## src/paimon/core/io/rolling_file_writer.h: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> +#include <utility> +#include <vector> + +#include "arrow/c/bridge.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/core/io/file_writer.h" +#include "paimon/core/io/single_file_writer.h" +#include "paimon/core/key_value.h" +#include "paimon/metrics.h" +#include "paimon/record_batch.h" + +namespace paimon { + +// Writer to roll over to a new file if the current size exceed the target file size. +template <typename T, typename R> +class RollingFileWriter : public FileWriter<T, std::vector<R>> { + public: + RollingFileWriter( + int64_t target_file_size, + std::function<Result<std::unique_ptr<SingleFileWriter<T, R>>>()> create_file_writer) + : target_file_size_(target_file_size), + create_file_writer(create_file_writer), + metrics_(std::make_shared<MetricsImpl>()), + logger_(Logger::GetLogger("RollingFileWriter")) {} + + ~RollingFileWriter() = default; + + Status Write(T record) override; + void Abort() override; + Status Close() override; + Result<std::vector<R>> GetResult() override; + + int64_t RecordCount() const override { + return record_count_; + } + + std::shared_ptr<Metrics> GetMetrics() const override { + return metrics_; + } + + int64_t TargetFileSize() const { + return target_file_size_; + } + + protected: + static constexpr int32_t CHECK_ROLLING_RECORD_CNT = 1000; + + bool SuggestCheck(); + Result<bool> NeedRollingFile(); + Result<std::unique_ptr<SingleFileWriter<T, R>>> NewWriter(); + Status OpenCurrentWriter(); + + int64_t target_file_size_ = 0; + std::function<Result<std::unique_ptr<SingleFileWriter<T, R>>>()> create_file_writer; + std::shared_ptr<Metrics> metrics_; Review Comment: The member `create_file_writer` is inconsistent with the surrounding naming convention (`target_file_size_`, `metrics_`, etc.). Rename it to `create_file_writer_` and initialize it via `std::move(create_file_writer)` to avoid an unnecessary `std::function` copy. ########## src/paimon/core/io/rolling_blob_file_writer.cpp: ########## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/io/rolling_blob_file_writer.h" + +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/array/array_base.h" +#include "arrow/array/array_nested.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "fmt/format.h" +#include "fmt/ranges.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/macros.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { + +RollingBlobFileWriter::RollingBlobFileWriter( + int64_t target_file_size, + std::function<Result<std::unique_ptr<MainWriter>>()> create_file_writer, + const std::shared_ptr<arrow::Schema>& blob_schema, + MultipleBlobFileWriter::BlobWriterCreator blob_writer_creator, + const std::shared_ptr<arrow::DataType>& data_type) + : RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>(target_file_size, + create_file_writer), + blob_schema_(blob_schema), + blob_writer_creator_(std::move(blob_writer_creator)), + data_type_(data_type), + logger_(Logger::GetLogger("RollingBlobFileWriter")) {} + +Status RollingBlobFileWriter::Write(::ArrowArray* record) { + ScopeGuard guard([this]() -> void { this->Abort(); }); + // Open the current writer if write the first record or roll over happen before. + if (PAIMON_UNLIKELY(current_writer_ == nullptr)) { + PAIMON_RETURN_NOT_OK(OpenCurrentWriter()); + } + if (PAIMON_UNLIKELY(blob_writer_ == nullptr)) { + blob_writer_ = std::make_unique<MultipleBlobFileWriter>(blob_schema_, blob_writer_creator_); + } + int64_t record_count = record->length; + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array, + arrow::ImportArray(record, data_type_)); + auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(arrow_array); + + PAIMON_ASSIGN_OR_RAISE(BlobUtils::SeparatedStructArrays separated_arrays, + BlobUtils::SeparateBlobArray(struct_array)); + // Write main (non-blob) data + ::ArrowArray c_main_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::ExportArray(*separated_arrays.main_array, &c_main_array)); + ScopeGuard array_lifecycle_guard( + [&c_main_array]() -> void { ArrowArrayRelease(&c_main_array); }); + PAIMON_RETURN_NOT_OK(current_writer_->Write(&c_main_array)); + + // Write blob data via MultipleBlobFileWriter (each blob field independently) + ::ArrowArray c_blob_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::ExportArray(*separated_arrays.blob_array, &c_blob_array)); + ScopeGuard blob_array_guard([&c_blob_array]() -> void { ArrowArrayRelease(&c_blob_array); }); + PAIMON_RETURN_NOT_OK(blob_writer_->Write(&c_blob_array)); + + record_count_ += record_count; + PAIMON_ASSIGN_OR_RAISE(bool need_rolling_file, NeedRollingFile()); + if (need_rolling_file) { + PAIMON_RETURN_NOT_OK(CloseCurrentWriter()); + } + guard.Release(); + return Status::OK(); +} + +Status RollingBlobFileWriter::CloseCurrentWriter() { + if (current_writer_ == nullptr) { + return Status::OK(); + } + if (blob_writer_ == nullptr) { + return Status::OK(); + } Review Comment: If `current_writer_` is non-null but `blob_writer_` is null, this returns OK without closing the current writer, which can leave file handles/resources open and can produce incomplete results. Close/abort `current_writer_` in this branch (or enforce an invariant via an assertion and return an error if it’s violated). ########## src/paimon/core/io/single_file_writer_test.cpp: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/io/single_file_writer.h" + +#include <map> +#include <utility> + +#include "arrow/api.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/defs.h" +#include "paimon/format/file_format.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class SimpleSingleFileWriter : public SingleFileWriter<int32_t, bool> { + public: + SimpleSingleFileWriter(const std::string& compression, + std::function<Status(int32_t, ArrowArray*)> converter) + : SingleFileWriter<int32_t, bool>(compression, converter) {} + + Result<bool> GetResult() override { + return true; + } +}; + +TEST(SingleFileWriterTest, TestSimple) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + std::string file_path = dir->Str() + "/single-file"; + auto data_type = arrow::struct_({arrow::field("col", arrow::int32())}); + auto converter = [&](int32_t value, ::ArrowArray* dest) -> Status { + std::string value_str = "[[" + std::to_string(value) + "]]"; + auto array = + arrow::ipc::internal::json::ArrayFromJSON(data_type, value_str.c_str()).ValueOrDie(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, dest)); Review Comment: Using `ValueOrDie()` will terminate the entire test process on failure, which makes failures harder to diagnose and can hide additional test results. Prefer asserting the Arrow result (e.g., assign via an `ASSERT_OK_AND_ASSIGN`-style helper) so failures are reported cleanly by gtest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
