Copilot commented on code in PR #60: URL: https://github.com/apache/paimon-cpp/pull/60#discussion_r3377353234
########## src/paimon/core/utils/manifest_meta_reader.h: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> + +#include "arrow/array/array_base.h" +#include "arrow/c/bridge.h" +#include "arrow/memory_pool.h" +#include "arrow/type.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" +#include "paimon/visibility.h" + +namespace arrow { +class DataType; +} // namespace arrow + +namespace paimon { +class MemoryPool; +class Metrics; + +class PAIMON_EXPORT ManifestMetaReader : public BatchReader { + public: + ManifestMetaReader(std::unique_ptr<BatchReader>&& reader, + const std::shared_ptr<arrow::DataType>& target_type, + const std::shared_ptr<MemoryPool>& pool); + + ~ManifestMetaReader() override { + DoClose(); + } + + Result<ReadBatch> NextBatch() override; + + std::shared_ptr<Metrics> GetReaderMetrics() const override { + return reader_->GetReaderMetrics(); + } + void Close() override { + DoClose(); + } + void DoClose() { + reader_->Close(); + } + + private: + // fill non exist field and correct int precision + static Result<std::shared_ptr<arrow::Array>> AlignArrayWithSchema( + const std::shared_ptr<arrow::Array>& src_array, + const std::shared_ptr<arrow::DataType>& target_type, arrow::MemoryPool* pool); Review Comment: `AlignArrayWithSchema` is declared `private`, but `manifest_meta_reader_test.cpp` calls `ManifestMetaReader::AlignArrayWithSchema(...)` directly. This will not compile. Make the method `public` (if it’s intended as a utility), or expose a test-only hook (e.g., a public wrapper under `#ifdef`), or change the tests to validate behavior via `NextBatch()` instead. ########## src/paimon/core/utils/manifest_meta_reader.cpp: ########## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/utils/manifest_meta_reader.h" + +#include <cstdint> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/api.h" +#include "arrow/array/array_base.h" +#include "arrow/array/array_nested.h" +#include "arrow/array/array_primitive.h" +#include "arrow/array/util.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/compute/cast.h" +#include "arrow/type.h" +#include "arrow/util/checked_cast.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/status.h" + +namespace paimon { +class MemoryPool; + +ManifestMetaReader::ManifestMetaReader(std::unique_ptr<BatchReader>&& reader, + const std::shared_ptr<arrow::DataType>& target_type, + const std::shared_ptr<MemoryPool>& pool) + : reader_(std::move(reader)), target_type_(target_type), pool_(GetArrowPool(pool)) {} + +Result<BatchReader::ReadBatch> ManifestMetaReader::NextBatch() { + PAIMON_ASSIGN_OR_RAISE(ReadBatch src_result, reader_->NextBatch()); + auto& c_array = src_result.first; + auto& c_schema = src_result.second; + if (!c_array) { + return BatchReader::MakeEofBatch(); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> arrow_array, + arrow::ImportArray(c_array.get(), c_schema.get())); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> target_array, + AlignArrayWithSchema(arrow_array, target_type_, pool_.get())); + std::unique_ptr<ArrowArray> target_c_arrow_array = std::make_unique<ArrowArray>(); + std::unique_ptr<ArrowSchema> target_c_schema = std::make_unique<ArrowSchema>(); + + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::ExportArray(*target_array, target_c_arrow_array.get(), target_c_schema.get())); + return std::make_pair(std::move(target_c_arrow_array), std::move(target_c_schema)); +} + +Result<std::shared_ptr<arrow::Array>> ManifestMetaReader::AlignArrayWithSchema( + const std::shared_ptr<arrow::Array>& src_array, + const std::shared_ptr<arrow::DataType>& target_type, arrow::MemoryPool* pool) { + const auto src_kind = src_array->type()->id(); + switch (src_kind) { + case arrow::Type::type::LIST: { + auto list_src_array = + arrow::internal::checked_pointer_cast<arrow::ListArray>(src_array); + auto list_target_type = + arrow::internal::checked_pointer_cast<arrow::ListType>(target_type); + if (!list_target_type) { + return Status::Invalid( + "Complete non exist field failed, target type cannot cast to a list data " + "type"); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> converted, + AlignArrayWithSchema(list_src_array->values(), + list_target_type->value_type(), pool)); + auto new_list_type = std::make_shared<arrow::ListType>(converted->type()); + return std::make_shared<arrow::ListArray>( + new_list_type, list_src_array->length(), list_src_array->value_offsets(), converted, + list_src_array->null_bitmap(), list_src_array->null_count(), + list_src_array->offset()); + } + case arrow::Type::type::MAP: { + auto map_src_array = arrow::internal::checked_pointer_cast<arrow::MapArray>(src_array); + auto map_target_type = + arrow::internal::checked_pointer_cast<arrow::MapType>(target_type); + if (!map_target_type) { + return Status::Invalid( + "Complete non exist field failed, target type cannot cast to a map data " + "type"); + } + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr<arrow::Array> key_converted, + AlignArrayWithSchema(map_src_array->keys(), map_target_type->key_type(), pool)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr<arrow::Array> value_converted, + AlignArrayWithSchema(map_src_array->items(), map_target_type->item_type(), pool)); + auto new_map_type = + std::make_shared<arrow::MapType>(key_converted->type(), value_converted->type()); + return std::make_shared<arrow::MapArray>( + new_map_type, map_src_array->length(), map_src_array->value_offsets(), + key_converted, value_converted, map_src_array->null_bitmap(), + map_src_array->null_count(), map_src_array->offset()); + } + case arrow::Type::type::STRUCT: { + auto struct_src_array = + arrow::internal::checked_pointer_cast<arrow::StructArray>(src_array); + auto struct_target_type = + arrow::internal::checked_pointer_cast<arrow::StructType>(target_type); + if (!struct_target_type) { + return Status::Invalid( + "Complete non exist field failed, target type cannot cast to a struct data " + "type"); + } + std::vector<std::string> field_names; + arrow::ArrayVector converted_array; + field_names.reserve(target_type->num_fields()); + converted_array.reserve(target_type->num_fields()); + for (int32_t i = 0; i < struct_target_type->num_fields(); i++) { + const auto& target_field = struct_target_type->field(i); + const std::string& target_name = target_field->name(); + field_names.push_back(target_name); + auto src_field = struct_src_array->GetFieldByName(target_name); + if (src_field) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr<arrow::Array> converted, + AlignArrayWithSchema(src_field, target_field->type(), pool)); + converted_array.push_back(converted); + } else { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr<arrow::Array> null_array, + arrow::MakeArrayOfNull(target_field->type(), struct_src_array->length(), + pool)); + converted_array.push_back(null_array); + } + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr<arrow::Array> arrow_array, + arrow::StructArray::Make( + converted_array, field_names, struct_src_array->null_bitmap(), + struct_src_array->null_count(), struct_src_array->offset())); + return arrow_array; + } + case arrow::Type::type::INT32: + // cast for avro format, avro store int8,int16,int32 as int32, so need to cast to + // correct type + if (src_kind != target_type->id()) { + arrow::compute::CastOptions cast_options; + cast_options.allow_int_overflow = false; + auto int32_array = + arrow::internal::checked_pointer_cast<arrow::Int32Array>(src_array); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr<arrow::Array> result, + arrow::compute::Cast(*int32_array, target_type, cast_options)); + return result; + } + default: + return src_array; Review Comment: The `INT32` switch case falls through to `default` when no cast is needed, which can trigger `-Wimplicit-fallthrough` (often treated as an error in CI). Add an explicit `return src_array;` (or `break` + return after the switch) when `src_kind == target_type->id()`, or annotate with `[[fallthrough]]` if intentional. ########## src/paimon/core/utils/file_store_path_factory.cpp: ########## @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/utils/file_store_path_factory.h" + +#include <cassert> + +#include "paimon/common/fs/external_path_provider.h" +#include "paimon/common/memory/memory_segment.h" +#include "paimon/common/utils/uuid.h" +#include "paimon/core/index/index_file_meta.h" +#include "paimon/core/index/index_in_data_file_dir_path_factory.h" +#include "paimon/core/io/data_file_path_factory.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/core/utils/partition_path_utils.h" +#include "paimon/core/utils/path_factory.h" +#include "paimon/macros.h" +#include "paimon/status.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { +class MemoryPool; + +FileStorePathFactory::FileStorePathFactory( + const std::string& root, const std::string& format_identifier, + const std::string& data_file_prefix, const std::string& uuid, + std::unique_ptr<BinaryRowPartitionComputer> partition_computer, + const std::vector<std::string>& external_paths, + const std::optional<std::string>& global_index_external_path, bool index_file_in_data_file_dir) + : root_(root), + format_identifier_(format_identifier), + data_file_prefix_(data_file_prefix), + uuid_(uuid), + partition_computer_(std::move(partition_computer)), + external_paths_(external_paths), + global_index_external_path_(global_index_external_path), + index_file_in_data_file_dir_(index_file_in_data_file_dir) {} + +Result<std::unique_ptr<FileStorePathFactory>> FileStorePathFactory::Create( + const std::string& root, const std::shared_ptr<arrow::Schema>& schema, + const std::vector<std::string>& partition_keys, const std::string& default_part_value, + const std::string& identifier, const std::string& data_file_prefix, + bool legacy_partition_name_enabled, const std::vector<std::string>& external_paths, + const std::optional<std::string>& global_index_external_path, bool index_file_in_data_file_dir, + const std::shared_ptr<MemoryPool>& memory_pool) { + if (memory_pool == nullptr) { + return Status::Invalid("memory pool is null pointer"); + } + std::string uuid; + if (PAIMON_UNLIKELY(!UUID::Generate(&uuid))) { + return Status::Invalid("fail to generate uuid for file store path factory"); + } + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr<BinaryRowPartitionComputer> partition_computer, + BinaryRowPartitionComputer::Create(partition_keys, schema, default_part_value, + legacy_partition_name_enabled, memory_pool)); + return std::unique_ptr<FileStorePathFactory>(new FileStorePathFactory( + root, identifier, data_file_prefix, uuid, std::move(partition_computer), external_paths, + global_index_external_path, index_file_in_data_file_dir)); +} + +std::unique_ptr<PathFactory> FileStorePathFactory::CreateManifestFileFactory() { + class ManifestFileFactory : public PathFactory { + public: + explicit ManifestFileFactory(const std::shared_ptr<FileStorePathFactory>& factory) + : factory_(factory) { + assert(factory_); + } + + std::string NewPath() const override { + return factory_->NewManifestFile(); + } + std::string ToPath(const std::string& file_name) const override { + return factory_->ToManifestFilePath(file_name); + } + + private: + std::shared_ptr<FileStorePathFactory> factory_ = nullptr; + }; + return std::make_unique<ManifestFileFactory>(shared_from_this()); +} +std::unique_ptr<PathFactory> FileStorePathFactory::CreateManifestListFactory() { + class ManifestListFactory : public PathFactory { + public: + explicit ManifestListFactory(const std::shared_ptr<FileStorePathFactory>& factory) + : factory_(factory) { + assert(factory_); + } + + std::string NewPath() const override { + return factory_->NewManifestList(); + } + std::string ToPath(const std::string& file_name) const override { + return factory_->ToManifestListPath(file_name); + } + + private: + std::shared_ptr<FileStorePathFactory> factory_; + }; + return std::make_unique<ManifestListFactory>(shared_from_this()); +} +std::unique_ptr<PathFactory> FileStorePathFactory::CreateIndexManifestFileFactory() { + class IndexManifestFileFactory : public PathFactory { + public: + explicit IndexManifestFileFactory(const std::shared_ptr<FileStorePathFactory>& factory) + : factory_(factory) { + assert(factory_); + } + + std::string NewPath() const override { + return factory_->NewIndexManifestFile(); + } + std::string ToPath(const std::string& file_name) const override { + return factory_->ToManifestFilePath(file_name); + } + + private: + std::shared_ptr<FileStorePathFactory> factory_; + }; + return std::make_unique<IndexManifestFileFactory>(shared_from_this()); +} + +Result<std::unique_ptr<IndexPathFactory>> FileStorePathFactory::CreateIndexFileFactory( + const BinaryRow& partition, int32_t bucket) { + if (index_file_in_data_file_dir_) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<DataFilePathFactory> data_file_path_factory, + CreateDataFilePathFactory(partition, bucket)); + return std::make_unique<IndexInDataFileDirPathFactory>(uuid_, index_file_count_, + data_file_path_factory); + } + return CreateGlobalIndexFileFactory(); +} + +std::unique_ptr<IndexPathFactory> FileStorePathFactory::CreateGlobalIndexFileFactory() { + class IndexPathFactoryImpl : public IndexPathFactory { + public: + explicit IndexPathFactoryImpl(const std::shared_ptr<FileStorePathFactory>& factory) + : factory_(factory) { + assert(factory_); + } + std::string NewPath() const override { + return factory_->NewIndexFile(); + } + std::string ToPath(const std::shared_ptr<IndexFileMeta>& file) const override { + const auto& external_path = file->ExternalPath(); + if (external_path) { + return external_path.value(); + } + return PathUtil::JoinPath(factory_->IndexPath(factory_->RootPath()), file->FileName()); + } + std::string ToPath(const std::string& file_name) const override { + const auto& external_path = factory_->GetGlobalIndexExternalPath(); + if (external_path) { + return PathUtil::JoinPath(external_path.value(), file_name); + } + return PathUtil::JoinPath(factory_->IndexPath(factory_->RootPath()), file_name); + } + bool IsExternalPath() const override { + return factory_->GetGlobalIndexExternalPath() != std::nullopt; + } + + private: + std::shared_ptr<FileStorePathFactory> factory_; + }; + return std::make_unique<IndexPathFactoryImpl>(shared_from_this()); +} + +Result<std::shared_ptr<DataFilePathFactory>> FileStorePathFactory::CreateDataFilePathFactory( + const BinaryRow& partition, int32_t bucket) const { + auto data_file_path_factory = std::make_shared<DataFilePathFactory>(); + PAIMON_ASSIGN_OR_RAISE(std::string bucket_path, BucketPath(partition, bucket)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<ExternalPathProvider> external_path_provider, + CreateExternalPathProvider(partition, bucket)); + PAIMON_RETURN_NOT_OK(data_file_path_factory->Init( + bucket_path, format_identifier_, data_file_prefix_, std::move(external_path_provider))); + return data_file_path_factory; +} + +Result<std::unique_ptr<ExternalPathProvider>> FileStorePathFactory::CreateExternalPathProvider( + const BinaryRow& partition, int32_t bucket) const { + if (external_paths_.empty()) { + return std::unique_ptr<ExternalPathProvider>(); + } + PAIMON_ASSIGN_OR_RAISE(std::string bucket_path, RelativeBucketPath(partition, bucket)); + return ExternalPathProvider::Create(external_paths_, bucket_path); +} + +Result<std::string> FileStorePathFactory::RelativeBucketPath(const BinaryRow& partition, + int32_t bucket) const { + PAIMON_ASSIGN_OR_RAISE(std::string partition_path, GetPartitionString(partition)); + std::string bucket_name = + bucket == BucketModeDefine::POSTPONE_BUCKET ? "postpone" : std::to_string(bucket); + return PathUtil::JoinPath(partition_path, std::string(BUCKET_PATH_PREFIX) + bucket_name); +} + +Result<std::string> FileStorePathFactory::BucketPath(const BinaryRow& partition, + int32_t bucket) const { + PAIMON_ASSIGN_OR_RAISE(std::string relative_bucket_path, RelativeBucketPath(partition, bucket)); + return PathUtil::JoinPath(root_, relative_bucket_path); +} + +Result<std::string> FileStorePathFactory::GetPartitionString(const BinaryRow& partition) const { + if (partition.GetSizeInBytes() == 0) { + return Status::Invalid("invalid binary row partition"); + } + auto iter = row_to_str_cache_.find(partition); + if (PAIMON_LIKELY(iter != row_to_str_cache_.end())) { + return iter->second; + } + std::vector<std::pair<std::string, std::string>> part_values; + PAIMON_ASSIGN_OR_RAISE(part_values, partition_computer_->GeneratePartitionVector(partition)); + PAIMON_ASSIGN_OR_RAISE(std::string part_str, + PartitionPathUtils::GeneratePartitionPath(part_values)) + return row_to_str_cache_.insert({partition, part_str}).first->second; Review Comment: Missing semicolon after the `PAIMON_ASSIGN_OR_RAISE(...)` invocation (line 231). As written, this should fail to compile. Add the trailing `;` after the macro call. ########## src/paimon/core/utils/tag_manager.cpp: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/utils/tag_manager.h" + +#include <memory> +#include <stdexcept> +#include <string> + +#include "fmt/format.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/tag/tag.h" +#include "paimon/core/utils/branch_manager.h" +#include "paimon/fs/file_system.h" + +namespace paimon { + +TagManager::TagManager(const std::shared_ptr<FileSystem>& fs, const std::string& root_path) + : TagManager(fs, root_path, BranchManager::DEFAULT_MAIN_BRANCH) {} + +TagManager::TagManager(const std::shared_ptr<FileSystem>& fs, const std::string& root_path, + const std::string& branch) + : fs_(fs), root_path_(root_path), branch_(BranchManager::NormalizeBranch(branch)) {} + +Result<Tag> TagManager::GetOrThrow(const std::string& tag_name) const { + PAIMON_ASSIGN_OR_RAISE(std::optional<Tag> tag, Get(tag_name)); + if (tag == std::nullopt) { + return Status::NotExist(fmt::format("Tag '{}' doesn't exist.", tag_name)); + } + return tag.value(); +} + +Result<std::optional<Tag>> TagManager::Get(const std::string& tag_name) const { + std::string tag_path = TagPath(tag_name); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, fs_->Exists(tag_path)); + if (!is_exist) { + return std::optional<Tag>(); + } + PAIMON_ASSIGN_OR_RAISE(Tag tag, Tag::FromPath(fs_, tag_path)); + return std::optional<Tag>(std::move(tag)); +} Review Comment: `TagManager` is constructed with `nullptr` filesystem in `TagManagerTest.TestTagPath`, but `Get()` unconditionally dereferences `fs_`. This makes the public API unsafe (nullptr deref) if `Get/GetOrThrow` are called on an instance created for path-only usage. Consider either (a) disallowing `nullptr` in the constructor (and adjust the test), or (b) returning `Status::Invalid(...)` from `Get/GetOrThrow` when `fs_` is null. ########## src/paimon/core/utils/index_file_path_factories.h: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include <cstddef> +#include <cstdint> +#include <memory> +#include <optional> +#include <utility> + +#include "paimon/common/data/binary_row.h" +#include "paimon/common/utils/concurrent_hash_map.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/result.h" + +namespace paimon { +class IndexPathFactory; + +/// Cache for index `PathFactory`s. +class IndexFilePathFactories { + public: + explicit IndexFilePathFactories(const std::shared_ptr<FileStorePathFactory>& path_factory) + : path_factory_(path_factory) {} + + Result<std::shared_ptr<IndexPathFactory>> Get(const BinaryRow& partition, int32_t bucket) { + auto key = std::make_pair(partition, bucket); + auto cached_factory = cache_.Find(key); + if (cached_factory) { + return cached_factory.value(); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<IndexPathFactory> index_path_factory, + path_factory_->CreateIndexFileFactory(partition, bucket)); + cache_.Insert(key, index_path_factory); + return index_path_factory; + } Review Comment: This is a classic check-then-act race on a concurrent cache: multiple threads can miss `Find`, create duplicate factories, and then contend on `Insert`. Prefer an atomic insert-or-get API (if `ConcurrentHashMap` supports it), or insert a placeholder / use a double-checked pattern that returns the value that actually landed in the map. Also, `key` copies `BinaryRow`; ensure `BinaryRow` is safe/cheap to copy for use as a stable hash key (otherwise consider storing an owned/normalized key representation). ########## src/paimon/core/utils/snapshot_manager.cpp: ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/utils/snapshot_manager.h" + +#include <algorithm> +#include <chrono> +#include <random> +#include <thread> +#include <utility> + +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/utils/branch_manager.h" +#include "paimon/core/utils/file_utils.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" + +namespace paimon { + +SnapshotManager::SnapshotManager(const std::shared_ptr<FileSystem>& fs, + const std::string& root_path) + : SnapshotManager(fs, root_path, BranchManager::DEFAULT_MAIN_BRANCH) {} + +SnapshotManager::SnapshotManager(const std::shared_ptr<FileSystem>& fs, + const std::string& root_path, const std::string& branch) + : fs_(fs), root_path_(root_path), branch_(BranchManager::NormalizeBranch(branch)) {} + +SnapshotManager::~SnapshotManager() = default; + +const std::shared_ptr<FileSystem>& SnapshotManager::Fs() const { + return fs_; +} + +const std::string& SnapshotManager::RootPath() const { + return root_path_; +} + +const std::string& SnapshotManager::Branch() const { + return branch_; +} + +Result<std::optional<Snapshot>> SnapshotManager::LatestSnapshotOfUser(const std::string& user) { + PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> latest_id, LatestSnapshotId()); + if (latest_id == std::nullopt) { + return std::optional<Snapshot>(); + } + PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> earliest_id, EarliestSnapshotId()); + if (earliest_id == std::nullopt) { + return Status::Invalid( + "Latest snapshot id is not null, but earliest snapshot id is null. This is " + "unexpected."); + } + + for (int64_t id = latest_id.value(); id >= earliest_id.value(); id--) { + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, LoadSnapshot(id)); + if (snapshot.CommitUser() == user) { + return std::optional<Snapshot>(snapshot); + } + } + return std::optional<Snapshot>(); +} + +Result<Snapshot> SnapshotManager::LoadSnapshot(int64_t snapshot_id) const { + return Snapshot::FromPath(fs_, SnapshotPath(snapshot_id)); +} + +Result<std::optional<Snapshot>> SnapshotManager::LatestSnapshot() const { + PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> snapshot_id, LatestSnapshotId()); + if (snapshot_id == std::nullopt) { + return std::optional<Snapshot>(); + } else { + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, LoadSnapshot(snapshot_id.value())); + return std::optional<Snapshot>(snapshot); + } +} + +Result<std::optional<int64_t>> SnapshotManager::LatestSnapshotId() const { + return FindLatest( + SnapshotDirectory(), std::string(SNAPSHOT_PREFIX), + [this](int64_t snapshot_id) -> std::string { return SnapshotPath(snapshot_id); }); +} + +Result<std::optional<int64_t>> SnapshotManager::EarliestSnapshotId() const { + return FindEarliest( + SnapshotDirectory(), std::string(SNAPSHOT_PREFIX), + [this](int64_t snapshot_id) -> std::string { return SnapshotPath(snapshot_id); }); +} + +std::string SnapshotManager::BranchPath() const { + return BranchManager::BranchPath(root_path_, branch_); +} + +std::string SnapshotManager::SnapshotDirectory() const { + return PathUtil::JoinPath(BranchPath(), "/snapshot"); +} + +Result<bool> SnapshotManager::SnapshotExists(int64_t snapshot_id) const { + return fs_->Exists(SnapshotPath(snapshot_id)); +} +std::string SnapshotManager::SnapshotPath(int64_t snapshot_id) const { + return PathUtil::JoinPath( + BranchPath(), "/snapshot/" + std::string(SNAPSHOT_PREFIX) + std::to_string(snapshot_id)); +} + +Result<std::optional<int64_t>> SnapshotManager::FindEarliest( + const std::string& dir, const std::string& prefix, + const std::function<std::string(int64_t)>& path_func) const { + PAIMON_ASSIGN_OR_RAISE(bool is_exist, fs_->Exists(dir)); + if (!is_exist) { + return std::optional<int64_t>(); + } + std::optional<int64_t> snapshot_id = ReadHint(EARLIEST, dir); + if (snapshot_id != std::nullopt) { + std::string path = path_func(snapshot_id.value()); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, fs_->Exists(path)); + if (is_exist) { + return snapshot_id; + } + } + return FindByListFiles([](int64_t lhs, int64_t rhs) -> int64_t { return std::min(lhs, rhs); }, + dir, prefix); +} + +Result<std::optional<int64_t>> SnapshotManager::FindLatest( + const std::string& dir, const std::string& prefix, + const std::function<std::string(int64_t)>& path_func) const { + PAIMON_ASSIGN_OR_RAISE(bool is_exist, fs_->Exists(dir)); + if (!is_exist) { + return std::optional<int64_t>(); + } + std::optional<int64_t> snapshot_id = ReadHint(LATEST, dir); + if (snapshot_id != std::nullopt && snapshot_id.value() > 0) { + int64_t next_snapshot = snapshot_id.value() + 1; + // it is the latest only there is no next one + std::string path = path_func(next_snapshot); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, fs_->Exists(path)); + if (!is_exist) { + return snapshot_id; + } + } + return FindByListFiles([](int64_t lhs, int64_t rhs) -> int64_t { return std::max(lhs, rhs); }, + dir, prefix); +} + +Result<std::optional<int64_t>> SnapshotManager::FindByListFiles( + const std::function<int64_t(int64_t, int64_t)> reducer_func, const std::string& dir, + const std::string& prefix) const { + std::vector<int64_t> versions; + PAIMON_RETURN_NOT_OK(FileUtils::ListVersionedFiles(fs_, dir, prefix, &versions)); + if (versions.empty()) { + return std::optional<int64_t>(); + } + int64_t ret = versions[0]; + for (const auto& version : versions) { + ret = reducer_func(ret, version); + } + return std::optional<int64_t>(ret); +} + +Result<std::set<std::string>> SnapshotManager::TryGetNonSnapshotFiles(int64_t older_than_ms) const { + std::set<std::string> non_snapshot_files; + + std::vector<std::unique_ptr<FileStatus>> file_status_list; + PAIMON_RETURN_NOT_OK(fs_->ListFileStatus(SnapshotDirectory(), &file_status_list)); + for (const auto& file_status : file_status_list) { + std::string file_name = PathUtil::GetName(file_status->GetPath()); + if (!StringUtils::StartsWith(file_name, std::string(SNAPSHOT_PREFIX)) && + file_name != std::string(EARLIEST) && file_name != std::string(LATEST)) { + if (file_status->GetModificationTime() < older_than_ms) { + non_snapshot_files.insert(file_status->GetPath()); + } + } + } + return non_snapshot_files; +} + +// Noted that: try best to read hint to avoid unnecessary list dir operation, if failed, do not +// return error +std::optional<int64_t> SnapshotManager::ReadHint(const std::string& file_name, + const std::string& dir) const { + std::string path = PathUtil::JoinPath(dir, file_name); + int32_t retry_number = 0; + while (retry_number++ < READ_HINT_RETRY_NUM) { + std::string content; + Status s = fs_->ReadFile(path, &content); + if (s.ok()) { + return StringUtils::StringToValue<int64_t>(content); + } + std::this_thread::sleep_for(std::chrono::milliseconds(READ_HINT_RETRY_INTERVAL)); + } + return std::nullopt; +} + +Status SnapshotManager::CommitLatestHint(int64_t snapshot_id) { + return CommitHint(snapshot_id, LATEST, SnapshotDirectory()); +} + +Status SnapshotManager::CommitEarliestHint(int64_t snapshot_id) { + return CommitHint(snapshot_id, EARLIEST, SnapshotDirectory()); +} + +Status SnapshotManager::CommitHint(int64_t snapshot_id, const std::string& file_name, + const std::string& dir) { + std::string path = PathUtil::JoinPath(dir, file_name); + std::string snapshot_id_str = std::to_string(snapshot_id); + int32_t loop_time = 3; + Status s; + while (loop_time-- > 0) { + s = fs_->WriteFile(path, snapshot_id_str, /*overwrite=*/true); + if (s.ok()) { + return s; + } else { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<int> dist(0, 999); + int64_t sleep_time = dist(gen) + 500; + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); + } + } + return s; +} + +Result<std::vector<Snapshot>> SnapshotManager::GetAllSnapshots() const { + std::vector<Snapshot> snapshots; + std::vector<std::unique_ptr<BasicFileStatus>> file_statuses; + PAIMON_RETURN_NOT_OK(FileUtils::ListVersionedFileStatus(fs_, SnapshotDirectory(), + SNAPSHOT_PREFIX, &file_statuses)); + for (const auto& file_status : file_statuses) { + auto snapshot_path = file_status->GetPath(); + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, Snapshot::FromPath(fs_, snapshot_path)); + snapshots.push_back(snapshot); + } + return snapshots; +} + +Result<std::optional<Snapshot>> SnapshotManager::EarlierOrEqualTimeMillis( + int64_t timestamp_millis) const { + return FindSnapshotBeforeTimestamp(timestamp_millis, std::less_equal<int64_t>{}); +} + +Result<std::optional<Snapshot>> SnapshotManager::EarlierThanTimeMillis( + int64_t timestamp_millis) const { + return FindSnapshotBeforeTimestamp(timestamp_millis, std::less<int64_t>{}); +} + +Result<std::optional<Snapshot>> SnapshotManager::FindSnapshotBeforeTimestamp( + int64_t timestamp_millis, const std::function<bool(int64_t, int64_t)>& compare) const { + PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> latest_id, LatestSnapshotId()); + if (latest_id == std::nullopt) { + return std::optional<Snapshot>(); + } + + PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> earliest_id, EarliestSnapshotId()); + if (earliest_id == std::nullopt) { + return std::optional<Snapshot>(); + } + + PAIMON_ASSIGN_OR_RAISE(Snapshot earliest_snapshot, LoadSnapshot(earliest_id.value())); + if (!compare(earliest_snapshot.TimeMillis(), timestamp_millis)) { + return std::optional<Snapshot>(); + } + + int64_t lo = earliest_id.value(); + int64_t hi = latest_id.value(); + std::optional<Snapshot> result; + + while (lo <= hi) { + int64_t mid = lo + (hi - lo) / 2; + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, LoadSnapshot(mid)); + if (compare(snapshot.TimeMillis(), timestamp_millis)) { + lo = mid + 1; + result = std::move(snapshot); + } else { + hi = mid - 1; + } + } Review Comment: The binary search assumes snapshot IDs are contiguous. If there are holes (deleted snapshots), `LoadSnapshot(mid)` can fail and the whole query errors, even though a valid answer exists. Consider performing the search over a sorted vector of existing snapshot IDs from `ListVersionedFiles`/`ListVersionedFileStatus`, or add logic to handle missing `mid` (e.g., probe neighbors or adjust bounds based on nearest existing ID). ########## src/paimon/core/utils/snapshot_manager.cpp: ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/utils/snapshot_manager.h" + +#include <algorithm> +#include <chrono> +#include <random> +#include <thread> +#include <utility> + +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/utils/branch_manager.h" +#include "paimon/core/utils/file_utils.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" + +namespace paimon { + +SnapshotManager::SnapshotManager(const std::shared_ptr<FileSystem>& fs, + const std::string& root_path) + : SnapshotManager(fs, root_path, BranchManager::DEFAULT_MAIN_BRANCH) {} + +SnapshotManager::SnapshotManager(const std::shared_ptr<FileSystem>& fs, + const std::string& root_path, const std::string& branch) + : fs_(fs), root_path_(root_path), branch_(BranchManager::NormalizeBranch(branch)) {} + +SnapshotManager::~SnapshotManager() = default; + +const std::shared_ptr<FileSystem>& SnapshotManager::Fs() const { + return fs_; +} + +const std::string& SnapshotManager::RootPath() const { + return root_path_; +} + +const std::string& SnapshotManager::Branch() const { + return branch_; +} + +Result<std::optional<Snapshot>> SnapshotManager::LatestSnapshotOfUser(const std::string& user) { + PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> latest_id, LatestSnapshotId()); + if (latest_id == std::nullopt) { + return std::optional<Snapshot>(); + } + PAIMON_ASSIGN_OR_RAISE(std::optional<int64_t> earliest_id, EarliestSnapshotId()); + if (earliest_id == std::nullopt) { + return Status::Invalid( + "Latest snapshot id is not null, but earliest snapshot id is null. This is " + "unexpected."); + } + + for (int64_t id = latest_id.value(); id >= earliest_id.value(); id--) { + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, LoadSnapshot(id)); + if (snapshot.CommitUser() == user) { + return std::optional<Snapshot>(snapshot); + } + } Review Comment: This loop assumes snapshot IDs in `[earliest, latest]` are contiguous and that every `snapshot-N` exists. In practice, snapshot expiration/cleanup can introduce gaps; `LoadSnapshot(id)` would then error out and abort the search even if an older matching snapshot exists. A robust approach is to iterate over the existing snapshot IDs (e.g., via `FileUtils::ListVersionedFiles`) in descending order, or to `SnapshotExists(id)` and `continue` on missing snapshots (treating NotExist as skip). ########## src/paimon/core/utils/versioned_object_serializer.h: ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <memory> + +#include "paimon/core/utils/object_serializer.h" +#include "paimon/core/utils/offset_row.h" + +namespace paimon { +/// An `ObjectSerializer` for versioned serialization. +template <typename T> +class PAIMON_EXPORT VersionedObjectSerializer : public ObjectSerializer<T> { + public: + explicit VersionedObjectSerializer(const std::shared_ptr<MemoryPool>& pool) + : ObjectSerializer<T>(VersionType(T::DataType()), pool) {} + + static std::shared_ptr<arrow::DataType> VersionType( + const std::shared_ptr<arrow::DataType>& data_type) { + auto struct_type = arrow::internal::checked_pointer_cast<arrow::StructType>(data_type); + if (!struct_type) { + assert(false); + return nullptr; + } + return struct_type + ->AddField(0, arrow::field("_VERSION", arrow::int32(), /*nullable=*/false)) + .ValueOr(nullptr); + } Review Comment: Error handling here is unsafe in release builds: `checked_pointer_cast` may DCHECK/abort (and the subsequent null-check is inconsistent), and `AddField(...).ValueOr(nullptr)` silently drops Arrow errors and can return `nullptr`, which would flow into `ObjectSerializer` and likely crash later. Consider switching to `dynamic_pointer_cast` (or returning a proper `Result<std::shared_ptr<arrow::DataType>>`) and propagating `AddField` failures via `Result/Status` instead of returning `nullptr`. ########## src/paimon/core/utils/objects_file.h: ########## @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <functional> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "paimon/common/data/columnar/columnar_row.h" +#include "paimon/common/utils/arrow/arrow_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/io/meta_to_arrow_array_converter.h" +#include "paimon/core/utils/manifest_meta_reader.h" +#include "paimon/core/utils/object_serializer.h" +#include "paimon/core/utils/path_factory.h" +#include "paimon/format/format_writer.h" +#include "paimon/format/reader_builder.h" +#include "paimon/format/writer_builder.h" +#include "paimon/fs/file_system.h" +#include "paimon/record_batch.h" + +namespace paimon { +/// A file which contains several `T`s, provides read and write. +class PredicateFilter; +template <typename T> +class ObjectsFile { + public: + ObjectsFile(const std::shared_ptr<FileSystem>& file_system, + const std::shared_ptr<ReaderBuilder>& reader_builder, + const std::shared_ptr<WriterBuilder>& writer_builder, + std::unique_ptr<ObjectSerializer<T>>&& serializer, const std::string& compression, + const std::shared_ptr<PathFactory>& path_factory, + const std::shared_ptr<MemoryPool>& pool); + + virtual ~ObjectsFile() = default; + + Status Read(const std::string& file_name, const std::function<Result<bool>(const T&)>& filter, + std::vector<T>* result) const; + Status ReadIfFileExist(const std::string& file_name, + const std::function<Result<bool>(const T&)>& filter, + std::vector<T>* result) const; + + void DeleteQuietly(const std::string& file_name) { + std::string path = path_factory_->ToPath(file_name); + auto status = file_system_->Delete(path); + // delete quietly will ignore any status error + (void)status; + } + + Result<std::pair<std::string, int64_t>> WriteWithoutRolling(const std::vector<T>& records); + + protected: + std::shared_ptr<PathFactory> path_factory_; + std::shared_ptr<MemoryPool> pool_; + std::unique_ptr<ObjectSerializer<T>> serializer_; + std::shared_ptr<WriterBuilder> writer_builder_; + std::unique_ptr<MetaToArrowArrayConverter> to_array_converter_; + + private: + std::shared_ptr<FileSystem> file_system_; + std::shared_ptr<ReaderBuilder> reader_builder_; + std::string compression_; +}; + +template <typename T> +ObjectsFile<T>::ObjectsFile(const std::shared_ptr<FileSystem>& file_system, + const std::shared_ptr<ReaderBuilder>& reader_builder, + const std::shared_ptr<WriterBuilder>& writer_builder, + std::unique_ptr<ObjectSerializer<T>>&& serializer, + const std::string& compression, + const std::shared_ptr<PathFactory>& path_factory, + const std::shared_ptr<MemoryPool>& pool) + : path_factory_(path_factory), + pool_(pool), + serializer_(std::move(serializer)), + writer_builder_(std::move(writer_builder)), + file_system_(file_system), + reader_builder_(std::move(reader_builder)), + compression_(compression) { + // TODO(xinyu.lxy): add cache +} + +template <typename T> +Status ObjectsFile<T>::ReadIfFileExist(const std::string& file_name, + const std::function<Result<bool>(const T&)>& filter, + std::vector<T>* result) const { + std::string file_path = path_factory_->ToPath(file_name); + PAIMON_ASSIGN_OR_RAISE(bool path_exist, file_system_->Exists(file_path)); + if (path_exist) { + return Read(file_name, filter, result); + } + return Status::OK(); +} + +template <typename T> +Status ObjectsFile<T>::Read(const std::string& file_name, + const std::function<Result<bool>(const T&)>& filter, + std::vector<T>* result) const { + std::string file_path = path_factory_->ToPath(file_name); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<InputStream> file_input_stream, + file_system_->Open(file_path)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileBatchReader> batch_reader, + reader_builder_->Build(file_input_stream)); + auto reader = std::make_unique<ManifestMetaReader>(std::move(batch_reader), + serializer_->GetDataType(), pool_); + while (true) { + PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatch arrow_array, reader->NextBatch()); + auto& c_array = arrow_array.first; + auto& c_schema = arrow_array.second; + if (!c_array) { + break; + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> typed_array, + arrow::ImportArray(c_array.get(), c_schema.get())); + auto* struct_array = dynamic_cast<arrow::StructArray*>(typed_array.get()); + if (!struct_array) { + return Status::Invalid(fmt::format("file {}, cannot cast to struct array", file_name)); + } + result->reserve(struct_array->length()); + for (int64_t i = 0; i < struct_array->length(); i++) { + ColumnarRow row(struct_array->fields(), pool_, i); Review Comment: `reserve(struct_array->length())` ignores the number of items already accumulated in `result` across batches, so it may fail to pre-allocate enough capacity and cause extra reallocations. Prefer reserving `result->size() + struct_array->length()` (or only reserving when capacity is insufficient). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
