Copilot commented on code in PR #92: URL: https://github.com/apache/paimon-cpp/pull/92#discussion_r3432668179
########## src/paimon/core/operation/key_value_file_store_write.h: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cstdint> +#include <memory> +#include <string> +#include <utility> + +#include "paimon/core/mergetree/compact/merge_function_wrapper.h" +#include "paimon/core/mergetree/compact/merge_tree_compact_manager_factory.h" +#include "paimon/core/operation/abstract_file_store_write.h" +#include "paimon/core/utils/batch_writer.h" Review Comment: This file includes `paimon/core/operation/abstract_file_store_write.h`, but that header is not present in the repository checkout. This will break compilation unless the base class header/source is added in this PR or the include is corrected to the existing location. ########## src/paimon/core/operation/append_only_file_store_write.cpp: ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/append_only_file_store_write.h" + +#include <vector> + +#include "paimon/common/data/binary_row.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/utils/arrow/arrow_utils.h" +#include "paimon/core/append/append_only_writer.h" +#include "paimon/core/append/bucketed_append_compact_manager.h" +#include "paimon/core/compact/noop_compact_manager.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/data_file_path_factory.h" +#include "paimon/core/io/data_file_writer.h" +#include "paimon/core/io/rolling_file_writer.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/operation/append_only_file_store_scan.h" +#include "paimon/core/operation/file_store_scan.h" +#include "paimon/core/operation/internal_read_context.h" +#include "paimon/core/operation/raw_file_split_read.h" +#include "paimon/core/operation/restore_files.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/executor.h" +#include "paimon/logging.h" +#include "paimon/read_context.h" +#include "paimon/result.h" +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { +class DataFilePathFactory; +class MemoryPool; +class SchemaManager; + +AppendOnlyFileStoreWrite::AppendOnlyFileStoreWrite( + const std::shared_ptr<FileStorePathFactory>& file_store_path_factory, + const std::shared_ptr<SnapshotManager>& snapshot_manager, + const std::shared_ptr<SchemaManager>& schema_manager, const std::string& commit_user, + const std::string& root_path, const std::shared_ptr<TableSchema>& table_schema, + const std::shared_ptr<arrow::Schema>& schema, + const std::shared_ptr<arrow::Schema>& write_schema, + const std::shared_ptr<arrow::Schema>& partition_schema, + const std::shared_ptr<BucketedDvMaintainer::Factory>& dv_maintainer_factory, + const std::shared_ptr<IOManager>& io_manager, const CoreOptions& options, + bool ignore_previous_files, bool is_streaming_mode, bool ignore_num_bucket_check, + const std::shared_ptr<Executor>& executor, const std::shared_ptr<MemoryPool>& pool) + : AbstractFileStoreWrite(file_store_path_factory, snapshot_manager, schema_manager, commit_user, + root_path, table_schema, schema, write_schema, partition_schema, + dv_maintainer_factory, io_manager, options, ignore_previous_files, + is_streaming_mode, ignore_num_bucket_check, executor, pool), + logger_(Logger::GetLogger("AppendOnlyFileStoreWrite")) { + write_cols_ = write_schema->field_names(); + auto schemas = BlobUtils::SeparateBlobSchema(schema_); + if (schemas.blob_schema && schemas.blob_schema->num_fields() > 0) { + with_blob_ = true; + } + // optimize write_cols to null in following cases: + // 1. write_schema contains all columns + // 2. TODO(xinyu.lxy) write_schema contains all columns and append _ROW_ID & _SEQUENCE_NUMBER + // cols + if (schema->Equals(write_schema)) { + write_cols_ = std::nullopt; + } +} + +AppendOnlyFileStoreWrite::~AppendOnlyFileStoreWrite() = default; + +Result<std::unique_ptr<FileStoreScan>> AppendOnlyFileStoreWrite::CreateFileStoreScan( + const std::shared_ptr<ScanFilter>& scan_filter) const { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr<ManifestList> manifest_list, + ManifestList::Create(options_.GetFileSystem(), options_.GetManifestFormat(), + options_.GetManifestCompression(), file_store_path_factory_, pool_)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr<ManifestFile> manifest_file, + ManifestFile::Create(options_.GetFileSystem(), options_.GetManifestFormat(), + options_.GetManifestCompression(), file_store_path_factory_, + options_.GetManifestTargetFileSize(), pool_, options_, + partition_schema_)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileStoreScan> scan, + AppendOnlyFileStoreScan::Create( + snapshot_manager_, schema_manager_, manifest_list, manifest_file, + table_schema_, schema_, scan_filter, options_, executor_, pool_)); + return scan; +} + +Result<std::vector<std::shared_ptr<DataFileMeta>>> AppendOnlyFileStoreWrite::CompactRewrite( + const BinaryRow& partition, int32_t bucket, DeletionVector::Factory dv_factory, + const std::vector<std::shared_ptr<DataFileMeta>>& to_compact, + const std::shared_ptr<CancellationController>& cancellation_controller) { + if (to_compact.empty()) { + return std::vector<std::shared_ptr<DataFileMeta>>{}; + } + + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<BatchReader> reader, + CreateFilesReader(partition, bucket, dv_factory, to_compact)); + auto rewriter = + std::make_unique<RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>( + options_.GetTargetFileSize(/*has_primary_key=*/false), + GetDataFileWriterCreator(partition, bucket, write_schema_, write_cols_, to_compact)); + + ScopeGuard reader_guard([&]() { + if (reader) { + reader->Close(); + } + }); + + ScopeGuard rewriter_guard([&]() { + if (rewriter) { + (void)rewriter->Close(); + } + }); + + while (true) { + if (cancellation_controller->IsCancelled()) { + return Status::Cancelled("Compaction cancelled while rewriting files."); + } + PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatch batch, reader->NextBatch()); + if (BatchReader::IsEofBatch(batch)) { + break; + } + auto& [c_array, c_schema] = batch; + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array, + arrow::ImportArray(c_array.get(), c_schema.get())); + auto struct_array = std::dynamic_pointer_cast<arrow::StructArray>(arrow_array); + if (!struct_array) { + return Status::Invalid( + "cannot cast array to StructArray in CompleteRowKindBatchReader"); + } + PAIMON_ASSIGN_OR_RAISE(struct_array, ArrowUtils::RemoveFieldFromStructArray( + struct_array, SpecialFields::ValueKind().Name())); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::ExportArray(*struct_array, c_array.get(), c_schema.get())); + ArrowSchemaRelease(c_schema.get()); + ScopeGuard guard([array = c_array.get()]() { ArrowArrayRelease(array); }); + PAIMON_RETURN_NOT_OK(rewriter->Write(c_array.get())); + guard.Release(); + } + rewriter_guard.Release(); + PAIMON_RETURN_NOT_OK(rewriter->Close()); + return rewriter->GetResult(); +} + +Result<std::shared_ptr<BatchWriter>> AppendOnlyFileStoreWrite::CreateWriter( + const BinaryRow& partition, int32_t bucket, + const std::vector<std::shared_ptr<DataFileMeta>>& restore_data_files, + int64_t restore_max_seq_number, const std::shared_ptr<BucketedDvMaintainer>& dv_maintainer) { + PAIMON_LOG_DEBUG(logger_, "Creating append only writer for partition %s, bucket %d", + partition.ToString().c_str(), bucket); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<DataFilePathFactory> data_file_path_factory, + file_store_path_factory_->CreateDataFilePathFactory(partition, bucket)); + + std::shared_ptr<CompactManager> compact_manager; + auto schemas = BlobUtils::SeparateBlobSchema(write_schema_); + if (options_.WriteOnly() || options_.DataEvolutionEnabled() || options_.GetBucket() == -1 || + with_blob_) { + compact_manager = std::make_shared<NoopCompactManager>(); + } else { + auto dv_factory = + [dv_maintainer]( + const std::string& file_name) -> Result<std::shared_ptr<DeletionVector>> { + if (dv_maintainer) { + return dv_maintainer->DeletionVectorOf(file_name).value_or( + std::shared_ptr<DeletionVector>()); + } + return std::shared_ptr<DeletionVector>(); + }; + auto cancellation_controller = std::make_shared<CancellationController>(); + + auto rewriter = [this, partition, bucket, dv_factory, cancellation_controller]( + const std::vector<std::shared_ptr<DataFileMeta>>& to_compact) + -> Result<std::vector<std::shared_ptr<DataFileMeta>>> { + return CompactRewrite(partition, bucket, dv_factory, to_compact, + cancellation_controller); + }; + + compact_manager = std::make_shared<BucketedAppendCompactManager>( + compact_executor_, restore_data_files, dv_maintainer, + options_.GetCompactionMinFileNum(), + options_.GetTargetFileSize(/*has_primary_key=*/false), + options_.GetCompactionFileSize(/*has_primary_key=*/false), + options_.CompactionForceRewriteAllFiles(), rewriter, + compaction_metrics_->CreateReporter(partition, bucket), cancellation_controller); + } + + auto writer = std::make_shared<AppendOnlyWriter>( + options_, table_schema_->Id(), write_schema_, write_cols_, restore_max_seq_number, + data_file_path_factory, compact_manager, pool_); + return std::shared_ptr<BatchWriter>(writer); +} + +AppendOnlyFileStoreWrite::SingleFileWriterCreator +AppendOnlyFileStoreWrite::GetDataFileWriterCreator( + const BinaryRow& partition, int32_t bucket, const std::shared_ptr<arrow::Schema>& schema, + const std::optional<std::vector<std::string>>& write_cols, + const std::vector<std::shared_ptr<DataFileMeta>>& to_compact) const { + return + [this, partition, bucket, schema, write_cols, to_compact]() + -> Result< + std::unique_ptr<SingleFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>> { + ::ArrowSchema arrow_schema; + ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); }); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &arrow_schema)); + auto format = options_.GetFileFormat(); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr<WriterBuilder> writer_builder, + format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize())); + writer_builder->WithMemoryPool(pool_); + + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &arrow_schema)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FormatStatsExtractor> stats_extractor, + format->CreateStatsExtractor(&arrow_schema)); Review Comment: `arrow::ExportSchema(*schema, &arrow_schema)` is called twice on the same `ArrowSchema` without an intermediate `ArrowSchemaRelease`. This is redundant and can leak/overwrite the first exported schema. Reuse the already-exported `arrow_schema` for `CreateStatsExtractor` instead of exporting again. ########## src/paimon/core/operation/append_only_file_store_write_test.cpp: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/append_only_file_store_write.h" + +#include <cstddef> +#include <map> +#include <vector> + +#include "arrow/array/array_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/identifier.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_row_writer.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/operation/restore_files.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/file_store_write.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/record_batch.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/write_context.h" + +namespace paimon::test { + +class AppendOnlyFileStoreWriteTest : public testing::Test { + public: + void SetUp() override { + fields_ = {arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), + arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), + arrow::field("f5", arrow::int32()), + arrow::field("f6", arrow::int32()), + arrow::field("f7", arrow::int64()), + arrow::field("f8", arrow::int64()), + arrow::field("f9", arrow::float32()), + arrow::field("f10", arrow::float64()), + arrow::field("f11", arrow::utf8()), + arrow::field("f12", arrow::binary()), + arrow::field("non-partition-field", arrow::int32())}; + commit_user_ = "test_commit_user"; + } + + private: + arrow::FieldVector fields_; + std::string commit_user_; +}; + +TEST_F(AppendOnlyFileStoreWriteTest, TestWriteWithInvalidBatch) { + { + arrow::Schema typed_schema(fields_); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, /*options=*/{}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), commit_user_); + ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + ASSERT_NOK_WITH_MSG(file_store_write->Write(nullptr), "batch is null pointer"); + } + { + arrow::Schema typed_schema(fields_); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, /*options=*/{}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + commit_user_); + ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + auto array = std::make_shared<arrow::Array>(); Review Comment: `arrow::Array` is an abstract base class, so `std::make_shared<arrow::Array>()` will not compile. Use an uninitialized `std::shared_ptr<arrow::Array>` and let `StringBuilder::Finish` populate it. ########## src/paimon/core/operation/key_value_file_store_write.h: ########## @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cstdint> +#include <memory> +#include <string> +#include <utility> + +#include "paimon/core/mergetree/compact/merge_function_wrapper.h" +#include "paimon/core/mergetree/compact/merge_tree_compact_manager_factory.h" +#include "paimon/core/operation/abstract_file_store_write.h" Review Comment: `MergeTreeCompactManagerFactory` is included from `paimon/core/mergetree/compact/merge_tree_compact_manager_factory.h`, but that header is not present in the repository checkout (and `MergeTreeCompactManagerFactory` cannot be found elsewhere). This will make the build fail; please add the missing header/source to the PR or update the include/type to the correct existing factory implementation. ########## src/paimon/core/operation/append_only_file_store_write.h: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <cstdint> +#include <memory> +#include <optional> +#include <string> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "arrow/type.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/core/compact/cancellation_controller.h" +#include "paimon/core/core_options.h" +#include "paimon/core/deletionvectors/deletion_vector.h" +#include "paimon/core/io/single_file_writer.h" +#include "paimon/core/operation/abstract_file_store_write.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/file_store_write.h" Review Comment: This file includes `paimon/core/operation/abstract_file_store_write.h`, but that header is not present in the repository checkout. As written, this prevents the new writer from compiling; please add the missing base class header/source to the PR or adjust the include to the actual location/name of the base class. ########## src/paimon/core/operation/metrics/compaction_metrics.h: ########## @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <algorithm> +#include <atomic> +#include <mutex> +#include <unordered_map> +#include <vector> + +#include "paimon/common/data/binary_row.h" +#include "paimon/common/metrics/metrics_impl.h" + +namespace paimon { +/// Metrics to measure a compaction. +class CompactionMetrics { + public: + static constexpr int32_t kCompactionTimeWindow = 100; + + static constexpr char MAX_LEVEL0_FILE_COUNT[] = "maxLevel0FileCount"; + static constexpr char AVG_LEVEL0_FILE_COUNT[] = "avgLevel0FileCount"; + static constexpr char AVG_COMPACTION_TIME[] = "avgCompactionTime"; + static constexpr char COMPACTION_COMPLETED_COUNT[] = "compactionCompletedCount"; + static constexpr char COMPACTION_TOTAL_COUNT[] = "compactionTotalCount"; + static constexpr char COMPACTION_QUEUED_COUNT[] = "compactionQueuedCount"; + static constexpr char MAX_COMPACTION_INPUT_SIZE[] = "maxCompactionInputSize"; + static constexpr char MAX_COMPACTION_OUTPUT_SIZE[] = "maxCompactionOutputSize"; + static constexpr char AVG_COMPACTION_INPUT_SIZE[] = "avgCompactionInputSize"; + static constexpr char AVG_COMPACTION_OUTPUT_SIZE[] = "avgCompactionOutputSize"; + static constexpr char MAX_TOTAL_FILE_SIZE[] = "maxTotalFileSize"; + static constexpr char AVG_TOTAL_FILE_SIZE[] = "avgTotalFileSize"; + + class Reporter { + public: + Reporter(CompactionMetrics* metrics, const BinaryRow& partition, int32_t bucket) + : metrics_(metrics), partition_(partition), bucket_(bucket) {} + + void ReportLevel0FileCount(int64_t count) { + level0_file_count_ = count; + } Review Comment: `Reporter`’s metric fields (`level0_file_count_`, `compaction_*_size_`, `total_file_size_`) are written by `Report*()` without synchronization, but read concurrently by `CompactionMetrics::{Max,Avg}*()` while only holding `reporter_mutex_` (which does not protect the reporter’s internal fields). This creates a data race in multi-threaded compaction. Consider making these fields `std::atomic<int64_t>` (and using `.store()`/`.load()`), or guarding reads/writes with a shared mutex owned by `CompactionMetrics`. ########## src/paimon/core/operation/key_value_file_store_write_test.cpp: ########## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/key_value_file_store_write.h" + +#include <cstddef> +#include <map> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/array/array_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_nested.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "arrow/type.h" +#include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/identifier.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/file_store_write.h" +#include "paimon/record_batch.h" +#include "paimon/status.h" +#include "paimon/testing/utils/test_helper.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/write_context.h" + +namespace paimon::test { + +class KeyValueFileStoreWriteTest : public ::testing::Test { + protected: + Result<std::unique_ptr<FileStoreWrite>> CreateSingleStringFileStoreWrite( + const std::map<std::string, std::string>& table_options, bool with_temp_directory) { + auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)}; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(typed_schema, &schema)); + + auto dir = UniqueTestDirectory::Create(); + if (!dir) { + return Status::Invalid("failed to create test directory"); + } + PAIMON_ASSIGN_OR_RAISE(auto catalog, Catalog::Create(dir->Str(), {})); + PAIMON_RETURN_NOT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + PAIMON_RETURN_NOT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{}, + /*primary_keys=*/{"f0"}, table_options, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), "test"); + if (with_temp_directory) { + context_builder.WithTempDirectory(dir->Str()); + } + + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<WriteContext> write_context, + context_builder.Finish()); + return FileStoreWrite::Create(std::move(write_context)); + } + + Status WriteSingleStringRow(FileStoreWrite* file_store_write, int32_t bucket, + const std::string& value) { + auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)}; + auto struct_type = arrow::struct_(fields); + arrow::StructBuilder struct_builder(struct_type, arrow::default_memory_pool(), + {std::make_shared<arrow::StringBuilder>()}); + auto string_builder = static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Append()); + PAIMON_RETURN_NOT_OK_FROM_ARROW(string_builder->Append(value)); + + std::shared_ptr<arrow::Array> array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Finish(&array)); + ::ArrowArray arrow_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &arrow_array)); + + RecordBatchBuilder batch_builder(&arrow_array); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<RecordBatch> batch, + batch_builder.SetBucket(bucket).Finish()); + Status write_status = file_store_write->Write(std::move(batch)); + if (!ArrowArrayIsReleased(&arrow_array)) { + ArrowArrayRelease(&arrow_array); + } + return write_status; + } +}; + +TEST_F(KeyValueFileStoreWriteTest, TestWriteWithInvalidBatch) { + auto fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), arrow::field("f5", arrow::int32()), + arrow::field("f6", arrow::int32()), arrow::field("f7", arrow::int64()), + arrow::field("f8", arrow::int64()), arrow::field("f9", arrow::float32()), + arrow::field("f10", arrow::float64()), arrow::field("f11", arrow::utf8()), + arrow::field("f12", arrow::binary()), arrow::field("non-partition-field", arrow::int32())}; + std::string commit_user = "test"; + { + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f1"}, /*options=*/{{"bucket", "1"}}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), commit_user); + ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + ASSERT_NOK_WITH_MSG(file_store_write->Write(nullptr), "batch is null pointer"); + } + { + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f1"}, /*options=*/{{"bucket", "-2"}}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + commit_user); + ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + auto array = std::make_shared<arrow::Array>(); Review Comment: `arrow::Array` is abstract, so `std::make_shared<arrow::Array>()` will not compile. Declare a `std::shared_ptr<arrow::Array>` and let `StringBuilder::Finish` assign the concrete array instance. ########## src/paimon/core/operation/key_value_file_store_write_test.cpp: ########## @@ -0,0 +1,391 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/operation/key_value_file_store_write.h" + +#include <cstddef> +#include <map> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/array/array_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_nested.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "arrow/type.h" +#include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/identifier.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/file_store_write.h" +#include "paimon/record_batch.h" +#include "paimon/status.h" +#include "paimon/testing/utils/test_helper.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/write_context.h" + +namespace paimon::test { + +class KeyValueFileStoreWriteTest : public ::testing::Test { + protected: + Result<std::unique_ptr<FileStoreWrite>> CreateSingleStringFileStoreWrite( + const std::map<std::string, std::string>& table_options, bool with_temp_directory) { + auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)}; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(typed_schema, &schema)); + + auto dir = UniqueTestDirectory::Create(); + if (!dir) { + return Status::Invalid("failed to create test directory"); + } + PAIMON_ASSIGN_OR_RAISE(auto catalog, Catalog::Create(dir->Str(), {})); + PAIMON_RETURN_NOT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + PAIMON_RETURN_NOT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{}, + /*primary_keys=*/{"f0"}, table_options, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), "test"); + if (with_temp_directory) { + context_builder.WithTempDirectory(dir->Str()); + } + + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<WriteContext> write_context, + context_builder.Finish()); + return FileStoreWrite::Create(std::move(write_context)); + } + + Status WriteSingleStringRow(FileStoreWrite* file_store_write, int32_t bucket, + const std::string& value) { + auto fields = {arrow::field("f0", arrow::utf8(), /*nullable=*/false)}; + auto struct_type = arrow::struct_(fields); + arrow::StructBuilder struct_builder(struct_type, arrow::default_memory_pool(), + {std::make_shared<arrow::StringBuilder>()}); + auto string_builder = static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Append()); + PAIMON_RETURN_NOT_OK_FROM_ARROW(string_builder->Append(value)); + + std::shared_ptr<arrow::Array> array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder.Finish(&array)); + ::ArrowArray arrow_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &arrow_array)); + + RecordBatchBuilder batch_builder(&arrow_array); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<RecordBatch> batch, + batch_builder.SetBucket(bucket).Finish()); + Status write_status = file_store_write->Write(std::move(batch)); + if (!ArrowArrayIsReleased(&arrow_array)) { + ArrowArrayRelease(&arrow_array); + } + return write_status; + } +}; + +TEST_F(KeyValueFileStoreWriteTest, TestWriteWithInvalidBatch) { + auto fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), arrow::field("f5", arrow::int32()), + arrow::field("f6", arrow::int32()), arrow::field("f7", arrow::int64()), + arrow::field("f8", arrow::int64()), arrow::field("f9", arrow::float32()), + arrow::field("f10", arrow::float64()), arrow::field("f11", arrow::utf8()), + arrow::field("f12", arrow::binary()), arrow::field("non-partition-field", arrow::int32())}; + std::string commit_user = "test"; + { + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f1"}, /*options=*/{{"bucket", "1"}}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), commit_user); + ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + ASSERT_NOK_WITH_MSG(file_store_write->Write(nullptr), "batch is null pointer"); + } + { + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f1"}, /*options=*/{{"bucket", "-2"}}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + commit_user); + ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + auto array = std::make_shared<arrow::Array>(); + arrow::StringBuilder builder; + for (size_t j = 0; j < 100; j++) { + ASSERT_TRUE(builder.Append(std::to_string(j)).ok()); + } + ASSERT_TRUE(builder.Finish(&array).ok()); + ::ArrowArray arrow_array; + ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); + RecordBatchBuilder batch_builder(&arrow_array); + ASSERT_OK_AND_ASSIGN(std::unique_ptr<RecordBatch> batch, + batch_builder.SetBucket(1).Finish()); + ASSERT_NOK_WITH_MSG(file_store_write->Write(std::move(batch)), + "batch bucket is 1 while options bucket is -2"); + ArrowArrayRelease(&arrow_array); + } + { + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), {})); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, /*partition_keys=*/{}, + /*primary_keys=*/{"f1"}, /*options=*/{{"bucket", "2"}}, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + commit_user); + ASSERT_OK_AND_ASSIGN(std::unique_ptr<WriteContext> write_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + auto array = std::make_shared<arrow::Array>(); Review Comment: `arrow::Array` is abstract, so `std::make_shared<arrow::Array>()` will not compile. Declare a `std::shared_ptr<arrow::Array>` and let `StringBuilder::Finish` assign the concrete array instance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
