Copilot commented on code in PR #92:
URL: https://github.com/apache/paimon-cpp/pull/92#discussion_r3432668179


##########
src/paimon/core/operation/key_value_file_store_write.h:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "paimon/core/mergetree/compact/merge_function_wrapper.h"
+#include "paimon/core/mergetree/compact/merge_tree_compact_manager_factory.h"
+#include "paimon/core/operation/abstract_file_store_write.h"
+#include "paimon/core/utils/batch_writer.h"

Review Comment:
   This file includes `paimon/core/operation/abstract_file_store_write.h`, but 
that header is not present in the repository checkout. This will break 
compilation unless the base class header/source is added in this PR or the 
include is corrected to the existing location.



##########
src/paimon/core/operation/append_only_file_store_write.cpp:
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/append_only_file_store_write.h"
+
+#include <vector>
+
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/table/special_fields.h"
+#include "paimon/common/utils/arrow/arrow_utils.h"
+#include "paimon/core/append/append_only_writer.h"
+#include "paimon/core/append/bucketed_append_compact_manager.h"
+#include "paimon/core/compact/noop_compact_manager.h"
+#include "paimon/core/core_options.h"
+#include "paimon/core/io/data_file_meta.h"
+#include "paimon/core/io/data_file_path_factory.h"
+#include "paimon/core/io/data_file_writer.h"
+#include "paimon/core/io/rolling_file_writer.h"
+#include "paimon/core/manifest/manifest_file.h"
+#include "paimon/core/manifest/manifest_list.h"
+#include "paimon/core/operation/append_only_file_store_scan.h"
+#include "paimon/core/operation/file_store_scan.h"
+#include "paimon/core/operation/internal_read_context.h"
+#include "paimon/core/operation/raw_file_split_read.h"
+#include "paimon/core/operation/restore_files.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/core/snapshot.h"
+#include "paimon/core/utils/file_store_path_factory.h"
+#include "paimon/core/utils/snapshot_manager.h"
+#include "paimon/executor.h"
+#include "paimon/logging.h"
+#include "paimon/read_context.h"
+#include "paimon/result.h"
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+class DataFilePathFactory;
+class MemoryPool;
+class SchemaManager;
+
+AppendOnlyFileStoreWrite::AppendOnlyFileStoreWrite(
+    const std::shared_ptr<FileStorePathFactory>& file_store_path_factory,
+    const std::shared_ptr<SnapshotManager>& snapshot_manager,
+    const std::shared_ptr<SchemaManager>& schema_manager, const std::string& 
commit_user,
+    const std::string& root_path, const std::shared_ptr<TableSchema>& 
table_schema,
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::shared_ptr<arrow::Schema>& write_schema,
+    const std::shared_ptr<arrow::Schema>& partition_schema,
+    const std::shared_ptr<BucketedDvMaintainer::Factory>& 
dv_maintainer_factory,
+    const std::shared_ptr<IOManager>& io_manager, const CoreOptions& options,
+    bool ignore_previous_files, bool is_streaming_mode, bool 
ignore_num_bucket_check,
+    const std::shared_ptr<Executor>& executor, const 
std::shared_ptr<MemoryPool>& pool)
+    : AbstractFileStoreWrite(file_store_path_factory, snapshot_manager, 
schema_manager, commit_user,
+                             root_path, table_schema, schema, write_schema, 
partition_schema,
+                             dv_maintainer_factory, io_manager, options, 
ignore_previous_files,
+                             is_streaming_mode, ignore_num_bucket_check, 
executor, pool),
+      logger_(Logger::GetLogger("AppendOnlyFileStoreWrite")) {
+    write_cols_ = write_schema->field_names();
+    auto schemas = BlobUtils::SeparateBlobSchema(schema_);
+    if (schemas.blob_schema && schemas.blob_schema->num_fields() > 0) {
+        with_blob_ = true;
+    }
+    // optimize write_cols to null in following cases:
+    // 1. write_schema contains all columns
+    // 2. TODO(xinyu.lxy) write_schema contains all columns and append _ROW_ID 
& _SEQUENCE_NUMBER
+    // cols
+    if (schema->Equals(write_schema)) {
+        write_cols_ = std::nullopt;
+    }
+}
+
+AppendOnlyFileStoreWrite::~AppendOnlyFileStoreWrite() = default;
+
+Result<std::unique_ptr<FileStoreScan>> 
AppendOnlyFileStoreWrite::CreateFileStoreScan(
+    const std::shared_ptr<ScanFilter>& scan_filter) const {
+    PAIMON_ASSIGN_OR_RAISE(
+        std::shared_ptr<ManifestList> manifest_list,
+        ManifestList::Create(options_.GetFileSystem(), 
options_.GetManifestFormat(),
+                             options_.GetManifestCompression(), 
file_store_path_factory_, pool_));
+    PAIMON_ASSIGN_OR_RAISE(
+        std::shared_ptr<ManifestFile> manifest_file,
+        ManifestFile::Create(options_.GetFileSystem(), 
options_.GetManifestFormat(),
+                             options_.GetManifestCompression(), 
file_store_path_factory_,
+                             options_.GetManifestTargetFileSize(), pool_, 
options_,
+                             partition_schema_));
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStoreScan> scan,
+                           AppendOnlyFileStoreScan::Create(
+                               snapshot_manager_, schema_manager_, 
manifest_list, manifest_file,
+                               table_schema_, schema_, scan_filter, options_, 
executor_, pool_));
+    return scan;
+}
+
+Result<std::vector<std::shared_ptr<DataFileMeta>>> 
AppendOnlyFileStoreWrite::CompactRewrite(
+    const BinaryRow& partition, int32_t bucket, DeletionVector::Factory 
dv_factory,
+    const std::vector<std::shared_ptr<DataFileMeta>>& to_compact,
+    const std::shared_ptr<CancellationController>& cancellation_controller) {
+    if (to_compact.empty()) {
+        return std::vector<std::shared_ptr<DataFileMeta>>{};
+    }
+
+    PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<BatchReader> reader,
+                           CreateFilesReader(partition, bucket, dv_factory, 
to_compact));
+    auto rewriter =
+        std::make_unique<RollingFileWriter<::ArrowArray*, 
std::shared_ptr<DataFileMeta>>>(
+            options_.GetTargetFileSize(/*has_primary_key=*/false),
+            GetDataFileWriterCreator(partition, bucket, write_schema_, 
write_cols_, to_compact));
+
+    ScopeGuard reader_guard([&]() {
+        if (reader) {
+            reader->Close();
+        }
+    });
+
+    ScopeGuard rewriter_guard([&]() {
+        if (rewriter) {
+            (void)rewriter->Close();
+        }
+    });
+
+    while (true) {
+        if (cancellation_controller->IsCancelled()) {
+            return Status::Cancelled("Compaction cancelled while rewriting 
files.");
+        }
+        PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatch batch, 
reader->NextBatch());
+        if (BatchReader::IsEofBatch(batch)) {
+            break;
+        }
+        auto& [c_array, c_schema] = batch;
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
arrow_array,
+                                          arrow::ImportArray(c_array.get(), 
c_schema.get()));
+        auto struct_array = 
std::dynamic_pointer_cast<arrow::StructArray>(arrow_array);
+        if (!struct_array) {
+            return Status::Invalid(
+                "cannot cast array to StructArray in 
CompleteRowKindBatchReader");
+        }
+        PAIMON_ASSIGN_OR_RAISE(struct_array, 
ArrowUtils::RemoveFieldFromStructArray(
+                                                 struct_array, 
SpecialFields::ValueKind().Name()));
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(
+            arrow::ExportArray(*struct_array, c_array.get(), c_schema.get()));
+        ArrowSchemaRelease(c_schema.get());
+        ScopeGuard guard([array = c_array.get()]() { ArrowArrayRelease(array); 
});
+        PAIMON_RETURN_NOT_OK(rewriter->Write(c_array.get()));
+        guard.Release();
+    }
+    rewriter_guard.Release();
+    PAIMON_RETURN_NOT_OK(rewriter->Close());
+    return rewriter->GetResult();
+}
+
+Result<std::shared_ptr<BatchWriter>> AppendOnlyFileStoreWrite::CreateWriter(
+    const BinaryRow& partition, int32_t bucket,
+    const std::vector<std::shared_ptr<DataFileMeta>>& restore_data_files,
+    int64_t restore_max_seq_number, const 
std::shared_ptr<BucketedDvMaintainer>& dv_maintainer) {
+    PAIMON_LOG_DEBUG(logger_, "Creating append only writer for partition %s, 
bucket %d",
+                     partition.ToString().c_str(), bucket);
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<DataFilePathFactory> 
data_file_path_factory,
+                           
file_store_path_factory_->CreateDataFilePathFactory(partition, bucket));
+
+    std::shared_ptr<CompactManager> compact_manager;
+    auto schemas = BlobUtils::SeparateBlobSchema(write_schema_);
+    if (options_.WriteOnly() || options_.DataEvolutionEnabled() || 
options_.GetBucket() == -1 ||
+        with_blob_) {
+        compact_manager = std::make_shared<NoopCompactManager>();
+    } else {
+        auto dv_factory =
+            [dv_maintainer](
+                const std::string& file_name) -> 
Result<std::shared_ptr<DeletionVector>> {
+            if (dv_maintainer) {
+                return dv_maintainer->DeletionVectorOf(file_name).value_or(
+                    std::shared_ptr<DeletionVector>());
+            }
+            return std::shared_ptr<DeletionVector>();
+        };
+        auto cancellation_controller = 
std::make_shared<CancellationController>();
+
+        auto rewriter = [this, partition, bucket, dv_factory, 
cancellation_controller](
+                            const std::vector<std::shared_ptr<DataFileMeta>>& 
to_compact)
+            -> Result<std::vector<std::shared_ptr<DataFileMeta>>> {
+            return CompactRewrite(partition, bucket, dv_factory, to_compact,
+                                  cancellation_controller);
+        };
+
+        compact_manager = std::make_shared<BucketedAppendCompactManager>(
+            compact_executor_, restore_data_files, dv_maintainer,
+            options_.GetCompactionMinFileNum(),
+            options_.GetTargetFileSize(/*has_primary_key=*/false),
+            options_.GetCompactionFileSize(/*has_primary_key=*/false),
+            options_.CompactionForceRewriteAllFiles(), rewriter,
+            compaction_metrics_->CreateReporter(partition, bucket), 
cancellation_controller);
+    }
+
+    auto writer = std::make_shared<AppendOnlyWriter>(
+        options_, table_schema_->Id(), write_schema_, write_cols_, 
restore_max_seq_number,
+        data_file_path_factory, compact_manager, pool_);
+    return std::shared_ptr<BatchWriter>(writer);
+}
+
+AppendOnlyFileStoreWrite::SingleFileWriterCreator
+AppendOnlyFileStoreWrite::GetDataFileWriterCreator(
+    const BinaryRow& partition, int32_t bucket, const 
std::shared_ptr<arrow::Schema>& schema,
+    const std::optional<std::vector<std::string>>& write_cols,
+    const std::vector<std::shared_ptr<DataFileMeta>>& to_compact) const {
+    return
+        [this, partition, bucket, schema, write_cols, to_compact]()
+            -> Result<
+                std::unique_ptr<SingleFileWriter<::ArrowArray*, 
std::shared_ptr<DataFileMeta>>>> {
+            ::ArrowSchema arrow_schema;
+            ScopeGuard guard([&arrow_schema]() { 
ArrowSchemaRelease(&arrow_schema); });
+            PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, 
&arrow_schema));
+            auto format = options_.GetFileFormat();
+            PAIMON_ASSIGN_OR_RAISE(
+                std::shared_ptr<WriterBuilder> writer_builder,
+                format->CreateWriterBuilder(&arrow_schema, 
options_.GetWriteBatchSize()));
+            writer_builder->WithMemoryPool(pool_);
+
+            PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, 
&arrow_schema));
+            PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FormatStatsExtractor> 
stats_extractor,
+                                   
format->CreateStatsExtractor(&arrow_schema));

Review Comment:
   `arrow::ExportSchema(*schema, &arrow_schema)` is called twice on the same 
`ArrowSchema` without an intermediate `ArrowSchemaRelease`. This is redundant 
and can leak/overwrite the first exported schema. Reuse the already-exported 
`arrow_schema` for `CreateStatsExtractor` instead of exporting again.



##########
src/paimon/core/operation/append_only_file_store_write_test.cpp:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/append_only_file_store_write.h"
+
+#include <cstddef>
+#include <map>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/catalog/catalog.h"
+#include "paimon/catalog/identifier.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/data/binary_row_writer.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/core/io/data_file_meta.h"
+#include "paimon/core/operation/restore_files.h"
+#include "paimon/core/snapshot.h"
+#include "paimon/core/utils/snapshot_manager.h"
+#include "paimon/file_store_write.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/record_batch.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/write_context.h"
+
+namespace paimon::test {
+
+class AppendOnlyFileStoreWriteTest : public testing::Test {
+ public:
+    void SetUp() override {
+        fields_ = {arrow::field("f0", arrow::boolean()),
+                   arrow::field("f1", arrow::int8()),
+                   arrow::field("f2", arrow::int8()),
+                   arrow::field("f3", arrow::int16()),
+                   arrow::field("f4", arrow::int16()),
+                   arrow::field("f5", arrow::int32()),
+                   arrow::field("f6", arrow::int32()),
+                   arrow::field("f7", arrow::int64()),
+                   arrow::field("f8", arrow::int64()),
+                   arrow::field("f9", arrow::float32()),
+                   arrow::field("f10", arrow::float64()),
+                   arrow::field("f11", arrow::utf8()),
+                   arrow::field("f12", arrow::binary()),
+                   arrow::field("non-partition-field", arrow::int32())};
+        commit_user_ = "test_commit_user";
+    }
+
+ private:
+    arrow::FieldVector fields_;
+    std::string commit_user_;
+};
+
+TEST_F(AppendOnlyFileStoreWriteTest, TestWriteWithInvalidBatch) {
+    {
+        arrow::Schema typed_schema(fields_);
+        ::ArrowSchema schema;
+        ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok());
+        auto dir = UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir);
+
+        ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {}));
+        ASSERT_OK(catalog->CreateDatabase("foo", {}, 
/*ignore_if_exists=*/false));
+        ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, 
/*partition_keys=*/{},
+                                       /*primary_keys=*/{}, /*options=*/{},
+                                       /*ignore_if_exists=*/false));
+
+        WriteContextBuilder builder(PathUtil::JoinPath(dir->Str(), 
"foo.db/bar"), commit_user_);
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, 
builder.Finish());
+        ASSERT_OK_AND_ASSIGN(auto file_store_write,
+                             FileStoreWrite::Create(std::move(write_context)));
+        ASSERT_NOK_WITH_MSG(file_store_write->Write(nullptr), "batch is null 
pointer");
+    }
+    {
+        arrow::Schema typed_schema(fields_);
+        ::ArrowSchema schema;
+        ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok());
+        auto dir = UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir);
+        ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {}));
+        ASSERT_OK(catalog->CreateDatabase("foo", {}, 
/*ignore_if_exists=*/false));
+        ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, 
/*partition_keys=*/{},
+                                       /*primary_keys=*/{}, /*options=*/{},
+                                       /*ignore_if_exists=*/false));
+
+        WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), 
"foo.db/bar"),
+                                            commit_user_);
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, 
context_builder.Finish());
+        ASSERT_OK_AND_ASSIGN(auto file_store_write,
+                             FileStoreWrite::Create(std::move(write_context)));
+        auto array = std::make_shared<arrow::Array>();

Review Comment:
   `arrow::Array` is an abstract base class, so 
`std::make_shared<arrow::Array>()` will not compile. Use an uninitialized 
`std::shared_ptr<arrow::Array>` and let `StringBuilder::Finish` populate it.



##########
src/paimon/core/operation/key_value_file_store_write.h:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "paimon/core/mergetree/compact/merge_function_wrapper.h"
+#include "paimon/core/mergetree/compact/merge_tree_compact_manager_factory.h"
+#include "paimon/core/operation/abstract_file_store_write.h"

Review Comment:
   `MergeTreeCompactManagerFactory` is included from 
`paimon/core/mergetree/compact/merge_tree_compact_manager_factory.h`, but that 
header is not present in the repository checkout (and 
`MergeTreeCompactManagerFactory` cannot be found elsewhere). This will make the 
build fail; please add the missing header/source to the PR or update the 
include/type to the correct existing factory implementation.



##########
src/paimon/core/operation/append_only_file_store_write.h:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "arrow/type.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/core/compact/cancellation_controller.h"
+#include "paimon/core/core_options.h"
+#include "paimon/core/deletionvectors/deletion_vector.h"
+#include "paimon/core/io/single_file_writer.h"
+#include "paimon/core/operation/abstract_file_store_write.h"
+#include "paimon/core/table/bucket_mode.h"
+#include "paimon/file_store_write.h"

Review Comment:
   This file includes `paimon/core/operation/abstract_file_store_write.h`, but 
that header is not present in the repository checkout. As written, this 
prevents the new writer from compiling; please add the missing base class 
header/source to the PR or adjust the include to the actual location/name of 
the base class.



##########
src/paimon/core/operation/metrics/compaction_metrics.h:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <algorithm>
+#include <atomic>
+#include <mutex>
+#include <unordered_map>
+#include <vector>
+
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/metrics/metrics_impl.h"
+
+namespace paimon {
+/// Metrics to measure a compaction.
+class CompactionMetrics {
+ public:
+    static constexpr int32_t kCompactionTimeWindow = 100;
+
+    static constexpr char MAX_LEVEL0_FILE_COUNT[] = "maxLevel0FileCount";
+    static constexpr char AVG_LEVEL0_FILE_COUNT[] = "avgLevel0FileCount";
+    static constexpr char AVG_COMPACTION_TIME[] = "avgCompactionTime";
+    static constexpr char COMPACTION_COMPLETED_COUNT[] = 
"compactionCompletedCount";
+    static constexpr char COMPACTION_TOTAL_COUNT[] = "compactionTotalCount";
+    static constexpr char COMPACTION_QUEUED_COUNT[] = "compactionQueuedCount";
+    static constexpr char MAX_COMPACTION_INPUT_SIZE[] = 
"maxCompactionInputSize";
+    static constexpr char MAX_COMPACTION_OUTPUT_SIZE[] = 
"maxCompactionOutputSize";
+    static constexpr char AVG_COMPACTION_INPUT_SIZE[] = 
"avgCompactionInputSize";
+    static constexpr char AVG_COMPACTION_OUTPUT_SIZE[] = 
"avgCompactionOutputSize";
+    static constexpr char MAX_TOTAL_FILE_SIZE[] = "maxTotalFileSize";
+    static constexpr char AVG_TOTAL_FILE_SIZE[] = "avgTotalFileSize";
+
+    class Reporter {
+     public:
+        Reporter(CompactionMetrics* metrics, const BinaryRow& partition, 
int32_t bucket)
+            : metrics_(metrics), partition_(partition), bucket_(bucket) {}
+
+        void ReportLevel0FileCount(int64_t count) {
+            level0_file_count_ = count;
+        }

Review Comment:
   `Reporter`’s metric fields (`level0_file_count_`, `compaction_*_size_`, 
`total_file_size_`) are written by `Report*()` without synchronization, but 
read concurrently by `CompactionMetrics::{Max,Avg}*()` while only holding 
`reporter_mutex_` (which does not protect the reporter’s internal fields). This 
creates a data race in multi-threaded compaction. Consider making these fields 
`std::atomic<int64_t>` (and using `.store()`/`.load()`), or guarding 
reads/writes with a shared mutex owned by `CompactionMetrics`.



##########
src/paimon/core/operation/key_value_file_store_write_test.cpp:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/key_value_file_store_write.h"
+
+#include <cstddef>
+#include <map>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/catalog/catalog.h"
+#include "paimon/catalog/identifier.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/core/table/sink/commit_message_impl.h"
+#include "paimon/file_store_write.h"
+#include "paimon/record_batch.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/test_helper.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/write_context.h"
+
+namespace paimon::test {
+
+class KeyValueFileStoreWriteTest : public ::testing::Test {
+ protected:
+    Result<std::unique_ptr<FileStoreWrite>> CreateSingleStringFileStoreWrite(
+        const std::map<std::string, std::string>& table_options, bool 
with_temp_directory) {
+        auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)};
+        arrow::Schema typed_schema(fields);
+        ::ArrowSchema schema;
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(typed_schema, 
&schema));
+
+        auto dir = UniqueTestDirectory::Create();
+        if (!dir) {
+            return Status::Invalid("failed to create test directory");
+        }
+        PAIMON_ASSIGN_OR_RAISE(auto catalog, Catalog::Create(dir->Str(), {}));
+        PAIMON_RETURN_NOT_OK(catalog->CreateDatabase("foo", {}, 
/*ignore_if_exists=*/false));
+        PAIMON_RETURN_NOT_OK(catalog->CreateTable(Identifier("foo", "bar"), 
&schema,
+                                                  /*partition_keys=*/{},
+                                                  /*primary_keys=*/{"f0"}, 
table_options,
+                                                  /*ignore_if_exists=*/false));
+
+        WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), 
"foo.db/bar"), "test");
+        if (with_temp_directory) {
+            context_builder.WithTempDirectory(dir->Str());
+        }
+
+        PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<WriteContext> write_context,
+                               context_builder.Finish());
+        return FileStoreWrite::Create(std::move(write_context));
+    }
+
+    Status WriteSingleStringRow(FileStoreWrite* file_store_write, int32_t 
bucket,
+                                const std::string& value) {
+        auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)};
+        auto struct_type = arrow::struct_(fields);
+        arrow::StructBuilder struct_builder(struct_type, 
arrow::default_memory_pool(),
+                                            
{std::make_shared<arrow::StringBuilder>()});
+        auto string_builder = 
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Append());
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(string_builder->Append(value));
+
+        std::shared_ptr<arrow::Array> array;
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Finish(&array));
+        ::ArrowArray arrow_array;
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, 
&arrow_array));
+
+        RecordBatchBuilder batch_builder(&arrow_array);
+        PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<RecordBatch> batch,
+                               batch_builder.SetBucket(bucket).Finish());
+        Status write_status = file_store_write->Write(std::move(batch));
+        if (!ArrowArrayIsReleased(&arrow_array)) {
+            ArrowArrayRelease(&arrow_array);
+        }
+        return write_status;
+    }
+};
+
+TEST_F(KeyValueFileStoreWriteTest, TestWriteWithInvalidBatch) {
+    auto fields = {
+        arrow::field("f0", arrow::boolean()),  arrow::field("f1", 
arrow::int8()),
+        arrow::field("f2", arrow::int8()),     arrow::field("f3", 
arrow::int16()),
+        arrow::field("f4", arrow::int16()),    arrow::field("f5", 
arrow::int32()),
+        arrow::field("f6", arrow::int32()),    arrow::field("f7", 
arrow::int64()),
+        arrow::field("f8", arrow::int64()),    arrow::field("f9", 
arrow::float32()),
+        arrow::field("f10", arrow::float64()), arrow::field("f11", 
arrow::utf8()),
+        arrow::field("f12", arrow::binary()),  
arrow::field("non-partition-field", arrow::int32())};
+    std::string commit_user = "test";
+    {
+        arrow::Schema typed_schema(fields);
+        ::ArrowSchema schema;
+        ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok());
+        auto dir = UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir);
+
+        ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {}));
+        ASSERT_OK(catalog->CreateDatabase("foo", {}, 
/*ignore_if_exists=*/false));
+        ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, 
/*partition_keys=*/{},
+                                       /*primary_keys=*/{"f1"}, 
/*options=*/{{"bucket", "1"}},
+                                       /*ignore_if_exists=*/false));
+
+        WriteContextBuilder builder(PathUtil::JoinPath(dir->Str(), 
"foo.db/bar"), commit_user);
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, 
builder.Finish());
+        ASSERT_OK_AND_ASSIGN(auto file_store_write,
+                             FileStoreWrite::Create(std::move(write_context)));
+        ASSERT_NOK_WITH_MSG(file_store_write->Write(nullptr), "batch is null 
pointer");
+    }
+    {
+        arrow::Schema typed_schema(fields);
+        ::ArrowSchema schema;
+        ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok());
+        auto dir = UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir);
+        ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {}));
+        ASSERT_OK(catalog->CreateDatabase("foo", {}, 
/*ignore_if_exists=*/false));
+        ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, 
/*partition_keys=*/{},
+                                       /*primary_keys=*/{"f1"}, 
/*options=*/{{"bucket", "-2"}},
+                                       /*ignore_if_exists=*/false));
+
+        WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), 
"foo.db/bar"),
+                                            commit_user);
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, 
context_builder.Finish());
+        ASSERT_OK_AND_ASSIGN(auto file_store_write,
+                             FileStoreWrite::Create(std::move(write_context)));
+        auto array = std::make_shared<arrow::Array>();

Review Comment:
   `arrow::Array` is abstract, so `std::make_shared<arrow::Array>()` will not 
compile. Declare a `std::shared_ptr<arrow::Array>` and let 
`StringBuilder::Finish` assign the concrete array instance.



##########
src/paimon/core/operation/key_value_file_store_write_test.cpp:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/core/operation/key_value_file_store_write.h"
+
+#include <cstddef>
+#include <map>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/array/array_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_nested.h"
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/type.h"
+#include "gtest/gtest.h"
+#include "paimon/catalog/catalog.h"
+#include "paimon/catalog/identifier.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/core/table/sink/commit_message_impl.h"
+#include "paimon/file_store_write.h"
+#include "paimon/record_batch.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/test_helper.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/write_context.h"
+
+namespace paimon::test {
+
+class KeyValueFileStoreWriteTest : public ::testing::Test {
+ protected:
+    Result<std::unique_ptr<FileStoreWrite>> CreateSingleStringFileStoreWrite(
+        const std::map<std::string, std::string>& table_options, bool 
with_temp_directory) {
+        auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)};
+        arrow::Schema typed_schema(fields);
+        ::ArrowSchema schema;
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(typed_schema, 
&schema));
+
+        auto dir = UniqueTestDirectory::Create();
+        if (!dir) {
+            return Status::Invalid("failed to create test directory");
+        }
+        PAIMON_ASSIGN_OR_RAISE(auto catalog, Catalog::Create(dir->Str(), {}));
+        PAIMON_RETURN_NOT_OK(catalog->CreateDatabase("foo", {}, 
/*ignore_if_exists=*/false));
+        PAIMON_RETURN_NOT_OK(catalog->CreateTable(Identifier("foo", "bar"), 
&schema,
+                                                  /*partition_keys=*/{},
+                                                  /*primary_keys=*/{"f0"}, 
table_options,
+                                                  /*ignore_if_exists=*/false));
+
+        WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), 
"foo.db/bar"), "test");
+        if (with_temp_directory) {
+            context_builder.WithTempDirectory(dir->Str());
+        }
+
+        PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<WriteContext> write_context,
+                               context_builder.Finish());
+        return FileStoreWrite::Create(std::move(write_context));
+    }
+
+    Status WriteSingleStringRow(FileStoreWrite* file_store_write, int32_t 
bucket,
+                                const std::string& value) {
+        auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)};
+        auto struct_type = arrow::struct_(fields);
+        arrow::StructBuilder struct_builder(struct_type, 
arrow::default_memory_pool(),
+                                            
{std::make_shared<arrow::StringBuilder>()});
+        auto string_builder = 
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Append());
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(string_builder->Append(value));
+
+        std::shared_ptr<arrow::Array> array;
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Finish(&array));
+        ::ArrowArray arrow_array;
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, 
&arrow_array));
+
+        RecordBatchBuilder batch_builder(&arrow_array);
+        PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<RecordBatch> batch,
+                               batch_builder.SetBucket(bucket).Finish());
+        Status write_status = file_store_write->Write(std::move(batch));
+        if (!ArrowArrayIsReleased(&arrow_array)) {
+            ArrowArrayRelease(&arrow_array);
+        }
+        return write_status;
+    }
+};
+
+TEST_F(KeyValueFileStoreWriteTest, TestWriteWithInvalidBatch) {
+    auto fields = {
+        arrow::field("f0", arrow::boolean()),  arrow::field("f1", 
arrow::int8()),
+        arrow::field("f2", arrow::int8()),     arrow::field("f3", 
arrow::int16()),
+        arrow::field("f4", arrow::int16()),    arrow::field("f5", 
arrow::int32()),
+        arrow::field("f6", arrow::int32()),    arrow::field("f7", 
arrow::int64()),
+        arrow::field("f8", arrow::int64()),    arrow::field("f9", 
arrow::float32()),
+        arrow::field("f10", arrow::float64()), arrow::field("f11", 
arrow::utf8()),
+        arrow::field("f12", arrow::binary()),  
arrow::field("non-partition-field", arrow::int32())};
+    std::string commit_user = "test";
+    {
+        arrow::Schema typed_schema(fields);
+        ::ArrowSchema schema;
+        ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok());
+        auto dir = UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir);
+
+        ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {}));
+        ASSERT_OK(catalog->CreateDatabase("foo", {}, 
/*ignore_if_exists=*/false));
+        ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, 
/*partition_keys=*/{},
+                                       /*primary_keys=*/{"f1"}, 
/*options=*/{{"bucket", "1"}},
+                                       /*ignore_if_exists=*/false));
+
+        WriteContextBuilder builder(PathUtil::JoinPath(dir->Str(), 
"foo.db/bar"), commit_user);
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, 
builder.Finish());
+        ASSERT_OK_AND_ASSIGN(auto file_store_write,
+                             FileStoreWrite::Create(std::move(write_context)));
+        ASSERT_NOK_WITH_MSG(file_store_write->Write(nullptr), "batch is null 
pointer");
+    }
+    {
+        arrow::Schema typed_schema(fields);
+        ::ArrowSchema schema;
+        ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok());
+        auto dir = UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir);
+        ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {}));
+        ASSERT_OK(catalog->CreateDatabase("foo", {}, 
/*ignore_if_exists=*/false));
+        ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, 
/*partition_keys=*/{},
+                                       /*primary_keys=*/{"f1"}, 
/*options=*/{{"bucket", "-2"}},
+                                       /*ignore_if_exists=*/false));
+
+        WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), 
"foo.db/bar"),
+                                            commit_user);
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, 
context_builder.Finish());
+        ASSERT_OK_AND_ASSIGN(auto file_store_write,
+                             FileStoreWrite::Create(std::move(write_context)));
+        auto array = std::make_shared<arrow::Array>();
+        arrow::StringBuilder builder;
+        for (size_t j = 0; j < 100; j++) {
+            ASSERT_TRUE(builder.Append(std::to_string(j)).ok());
+        }
+        ASSERT_TRUE(builder.Finish(&array).ok());
+        ::ArrowArray arrow_array;
+        ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok());
+        RecordBatchBuilder batch_builder(&arrow_array);
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch,
+                             batch_builder.SetBucket(1).Finish());
+        ASSERT_NOK_WITH_MSG(file_store_write->Write(std::move(batch)),
+                            "batch bucket is 1 while options bucket is -2");
+        ArrowArrayRelease(&arrow_array);
+    }
+    {
+        arrow::Schema typed_schema(fields);
+        ::ArrowSchema schema;
+        ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok());
+        auto dir = UniqueTestDirectory::Create();
+        ASSERT_TRUE(dir);
+        ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {}));
+        ASSERT_OK(catalog->CreateDatabase("foo", {}, 
/*ignore_if_exists=*/false));
+        ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, 
/*partition_keys=*/{},
+                                       /*primary_keys=*/{"f1"}, 
/*options=*/{{"bucket", "2"}},
+                                       /*ignore_if_exists=*/false));
+
+        WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), 
"foo.db/bar"),
+                                            commit_user);
+        ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, 
context_builder.Finish());
+        ASSERT_OK_AND_ASSIGN(auto file_store_write,
+                             FileStoreWrite::Create(std::move(write_context)));
+        auto array = std::make_shared<arrow::Array>();

Review Comment:
   `arrow::Array` is abstract, so `std::make_shared<arrow::Array>()` will not 
compile. Declare a `std::shared_ptr<arrow::Array>` and let 
`StringBuilder::Finish` assign the concrete array instance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to