Copilot commented on code in PR #91:
URL: https://github.com/apache/paimon-cpp/pull/91#discussion_r3432647998


##########
src/paimon/core/operation/file_system_write_restore.h:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#pragma once
+
+#include <limits>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "paimon/core/core_options.h"
+#include "paimon/core/index/index_file_handler.h"
+#include "paimon/core/operation/file_store_scan.h"
+#include "paimon/core/operation/restore_files.h"
+#include "paimon/core/operation/write_restore.h"
+#include "paimon/core/utils/snapshot_manager.h"
+
+namespace paimon {
+
+/// `WriteRestore` to restore files directly from file system.
+class FileSystemWriteRestore : public WriteRestore {
+ public:
+    FileSystemWriteRestore(const std::shared_ptr<SnapshotManager>& 
snapshot_manager,
+                           std::unique_ptr<FileStoreScan>&& scan,
+                           const std::shared_ptr<IndexFileHandler>& 
index_file_handler)
+        : snapshot_manager_(snapshot_manager),
+          scan_(std::move(scan)),
+          index_file_handler_(index_file_handler) {}
+
+    Result<int64_t> LatestCommittedIdentifier(const std::string& user) const 
override {
+        // TODO(yonghao.fyh): in java paimon is 
LatestSnapshotOfUserFromFileSystem
+        PAIMON_ASSIGN_OR_RAISE(std::optional<Snapshot> latest_snapshot,
+                               snapshot_manager_->LatestSnapshotOfUser(user));
+        if (latest_snapshot) {
+            return latest_snapshot.value().CommitIdentifier();
+        }
+        return std::numeric_limits<int64_t>::min();
+    }
+
+    Result<std::shared_ptr<RestoreFiles>> GetRestoreFiles(
+        const BinaryRow& partition, int32_t bucket,
+        bool scan_deletion_vectors_index) const override {
+        // TODO(yonghao.fyh): java paimon doesn't use 
snapshot_manager.LatestSnapshot() here,
+        // because they don't want to flood the catalog with high concurrency
+        PAIMON_ASSIGN_OR_RAISE(std::optional<Snapshot> snapshot,
+                               snapshot_manager_->LatestSnapshot());

Review Comment:
   GetRestoreFiles() dereferences snapshot_manager_ without a null check. If 
this class is constructed with a null SnapshotManager, this will crash.



##########
src/paimon/core/operation/file_system_write_restore.h:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#pragma once
+
+#include <limits>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "paimon/core/core_options.h"
+#include "paimon/core/index/index_file_handler.h"
+#include "paimon/core/operation/file_store_scan.h"
+#include "paimon/core/operation/restore_files.h"
+#include "paimon/core/operation/write_restore.h"
+#include "paimon/core/utils/snapshot_manager.h"
+
+namespace paimon {
+
+/// `WriteRestore` to restore files directly from file system.
+class FileSystemWriteRestore : public WriteRestore {
+ public:
+    FileSystemWriteRestore(const std::shared_ptr<SnapshotManager>& 
snapshot_manager,
+                           std::unique_ptr<FileStoreScan>&& scan,
+                           const std::shared_ptr<IndexFileHandler>& 
index_file_handler)
+        : snapshot_manager_(snapshot_manager),
+          scan_(std::move(scan)),
+          index_file_handler_(index_file_handler) {}
+
+    Result<int64_t> LatestCommittedIdentifier(const std::string& user) const 
override {
+        // TODO(yonghao.fyh): in java paimon is 
LatestSnapshotOfUserFromFileSystem
+        PAIMON_ASSIGN_OR_RAISE(std::optional<Snapshot> latest_snapshot,
+                               snapshot_manager_->LatestSnapshotOfUser(user));
+        if (latest_snapshot) {
+            return latest_snapshot.value().CommitIdentifier();
+        }
+        return std::numeric_limits<int64_t>::min();
+    }
+
+    Result<std::shared_ptr<RestoreFiles>> GetRestoreFiles(
+        const BinaryRow& partition, int32_t bucket,
+        bool scan_deletion_vectors_index) const override {
+        // TODO(yonghao.fyh): java paimon doesn't use 
snapshot_manager.LatestSnapshot() here,
+        // because they don't want to flood the catalog with high concurrency
+        PAIMON_ASSIGN_OR_RAISE(std::optional<Snapshot> snapshot,
+                               snapshot_manager_->LatestSnapshot());
+        if (snapshot == std::nullopt) {
+            return RestoreFiles::Empty();
+        }
+
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileStoreScan::RawPlan> plan,
+                               
scan_->WithSnapshot(snapshot.value())->CreatePlan());
+        std::vector<ManifestEntry> entries = plan->Files();

Review Comment:
   GetRestoreFiles() dereferences scan_ (scan_->WithSnapshot(...)) without 
validating it is non-null. This can segfault whenever a latest snapshot exists 
but the restore was constructed with a null scan (tests currently do that for 
other methods).



##########
src/paimon/core/operation/abstract_file_store_write.cpp:
##########
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "paimon/core/operation/abstract_file_store_write.h"
+
+#include <algorithm>
+#include <cassert>
+#include <map>
+#include <optional>
+
+#include "fmt/format.h"
+#include "paimon/common/data/binary_row.h"
+#include "paimon/common/metrics/metrics_impl.h"
+#include "paimon/core/operation/file_store_scan.h"
+#include "paimon/core/operation/file_system_write_restore.h"
+#include "paimon/core/operation/metrics/compaction_metrics.h"
+#include "paimon/core/operation/restore_files.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/core/snapshot.h"
+#include "paimon/core/table/bucket_mode.h"
+#include "paimon/core/table/sink/commit_message_impl.h"
+#include "paimon/core/utils/batch_writer.h"
+#include "paimon/core/utils/commit_increment.h"
+#include "paimon/core/utils/file_store_path_factory.h"
+#include "paimon/core/utils/snapshot_manager.h"
+#include "paimon/macros.h"
+#include "paimon/record_batch.h"
+#include "paimon/scan_context.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+class Executor;
+class MemoryPool;
+
+AbstractFileStoreWrite::AbstractFileStoreWrite(
+    const std::shared_ptr<FileStorePathFactory>& file_store_path_factory,
+    const std::shared_ptr<SnapshotManager>& snapshot_manager,
+    const std::shared_ptr<SchemaManager>& schema_manager, const std::string& 
commit_user,
+    const std::string& root_path, const std::shared_ptr<TableSchema>& 
table_schema,
+    const std::shared_ptr<arrow::Schema>& schema,
+    const std::shared_ptr<arrow::Schema>& write_schema,
+    const std::shared_ptr<arrow::Schema>& partition_schema,
+    const std::shared_ptr<BucketedDvMaintainer::Factory>& 
dv_maintainer_factory,
+    const std::shared_ptr<IOManager>& io_manager, const CoreOptions& options,
+    bool ignore_previous_files, bool is_streaming_mode, bool 
ignore_num_bucket_check,
+    const std::shared_ptr<Executor>& executor, const 
std::shared_ptr<MemoryPool>& pool)
+    : pool_(pool),
+      executor_(executor),
+      file_store_path_factory_(file_store_path_factory),
+      snapshot_manager_(snapshot_manager),
+      schema_manager_(schema_manager),
+      commit_user_(commit_user),
+      root_path_(root_path),
+      schema_(schema),
+      write_schema_(write_schema),
+      table_schema_(table_schema),
+      partition_schema_(partition_schema),
+      dv_maintainer_factory_(dv_maintainer_factory),
+      io_manager_(io_manager),
+      options_(options),
+      compact_executor_(CreateDefaultExecutor(4)),
+      compaction_metrics_(std::make_shared<CompactionMetrics>()),
+      ignore_previous_files_(ignore_previous_files),
+      is_streaming_mode_(is_streaming_mode),
+      ignore_num_bucket_check_(ignore_num_bucket_check),
+      metrics_(std::make_shared<MetricsImpl>()),
+      logger_(Logger::GetLogger("AbstractFileStoreWrite")) {
+    writer_memory_manager_ =
+        
std::make_unique<WriterMemoryManager>(static_cast<uint64_t>(options.GetWriteBufferSize()));
+    cache_manager_ = 
std::make_shared<CacheManager>(options.GetLookupCacheMaxMemory(),
+                                                    
options.GetLookupCacheHighPrioPoolRatio());
+}
+
+Status AbstractFileStoreWrite::Write(std::unique_ptr<RecordBatch>&& batch) {
+    if (PAIMON_UNLIKELY(batch == nullptr)) {
+        return Status::Invalid("batch is null pointer");
+    }
+    // in FileStoreWrite::Create() we have checked the table kind and bucket 
mode, here we only
+    // check the bucket id in batch
+    if (options_.GetBucket() == -1) {
+        assert(table_schema_->PrimaryKeys().empty());
+        if (!batch->HasSpecifiedBucket()) {
+            batch->SetBucket(BucketModeDefine::UNAWARE_BUCKET);
+        } else if (batch->GetBucket() != BucketModeDefine::UNAWARE_BUCKET) {
+            return Status::Invalid(
+                fmt::format("batch bucket is {} while options bucket is -1", 
batch->GetBucket()));
+        }
+    } else if (options_.GetBucket() == BucketModeDefine::POSTPONE_BUCKET) {
+        assert(!table_schema_->PrimaryKeys().empty());
+        if (!batch->HasSpecifiedBucket()) {
+            batch->SetBucket(BucketModeDefine::POSTPONE_BUCKET);
+        } else if (batch->GetBucket() != BucketModeDefine::POSTPONE_BUCKET) {
+            return Status::Invalid(
+                fmt::format("batch bucket is {} while options bucket is -2", 
batch->GetBucket()));
+        }
+    } else {
+        assert(options_.GetBucket() > 0);
+        if (!(batch->GetBucket() >= 0 && batch->GetBucket() < 
options_.GetBucket())) {
+            return Status::Invalid(
+                fmt::format("fixed bucketed mode must specify a bucket which 
in [0, {}) in "
+                            "RecordBatch",
+                            options_.GetBucket()));
+        }
+    }
+    // check nullability
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(
+        std::shared_ptr<arrow::Array> data,
+        arrow::ImportArray(batch->GetData(), 
arrow::struct_(write_schema_->fields())));
+    PAIMON_RETURN_NOT_OK(ArrowUtils::CheckNullabilityMatch(write_schema_, 
data));
+    PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*data, 
batch->GetData()));
+
+    PAIMON_ASSIGN_OR_RAISE(BinaryRow partition,
+                           
file_store_path_factory_->ToBinaryRow(batch->GetPartition()))
+    PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<BatchWriter> writer,
+                           GetWriter(partition, batch->GetBucket()));

Review Comment:
   Missing semicolon after the PAIMON_ASSIGN_OR_RAISE(...) macro call causes a 
compilation error (the next statement is concatenated).



##########
src/paimon/core/operation/write_context.cpp:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "paimon/write_context.h"
+
+#include <utility>
+
+#include "arrow/util/thread_pool.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/path_util.h"
+#include "paimon/core/utils/branch_manager.h"
+#include "paimon/executor.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+
+WriteContext::WriteContext(const std::string& root_path, const std::string& 
commit_user,
+                           bool is_streaming_mode, bool 
ignore_num_bucket_check,
+                           bool ignore_previous_files, bool 
enable_multi_thread_spill,
+                           const std::optional<int32_t>& write_id, const 
std::string& branch,
+                           const std::vector<std::string>& write_schema,
+                           const std::shared_ptr<MemoryPool>& memory_pool,
+                           const std::shared_ptr<Executor>& executor,
+                           const std::string& temp_directory,
+                           const std::shared_ptr<FileSystem>& 
specific_file_system,
+                           const std::map<std::string, std::string>& 
fs_scheme_to_identifier_map,
+                           const std::map<std::string, std::string>& options)
+    : root_path_(root_path),
+      commit_user_(commit_user),
+      branch_(branch),
+      is_streaming_mode_(is_streaming_mode),
+      ignore_num_bucket_check_(ignore_num_bucket_check),
+      ignore_previous_files_(ignore_previous_files),
+      enable_multi_thread_spill_(enable_multi_thread_spill),
+      write_id_(write_id),
+      write_schema_(write_schema),
+      memory_pool_(memory_pool),
+      executor_(executor),
+      temp_directory_(temp_directory),
+      specific_file_system_(specific_file_system),
+      fs_scheme_to_identifier_map_(fs_scheme_to_identifier_map),
+      options_(options) {}
+
+WriteContext::~WriteContext() = default;
+
+class WriteContextBuilder::Impl {
+ public:
+    friend class WriteContextBuilder;
+
+    void Reset() {
+        write_id_ = std::nullopt;
+        is_streaming_mode_ = false;
+        ignore_num_bucket_check_ = false;
+        ignore_previous_files_ = false;
+        spill_thread_number_ = 0;
+        memory_pool_ = GetDefaultPool();
+        executor_ = CreateDefaultExecutor();
+        temp_directory_.clear();
+        branch_ = BranchManager::DEFAULT_MAIN_BRANCH;
+        write_schema_.clear();
+        fs_scheme_to_identifier_map_.clear();
+        specific_file_system_.reset();
+        options_.clear();
+    }
+
+ private:
+    std::string root_path_;
+    std::string commit_user_;
+    std::string branch_ = BranchManager::DEFAULT_MAIN_BRANCH;
+    std::optional<int32_t> write_id_;
+    bool is_streaming_mode_ = false;
+    bool ignore_num_bucket_check_ = false;
+    bool ignore_previous_files_ = false;
+    int32_t spill_thread_number_ = 0;
+    std::vector<std::string> write_schema_;
+    std::shared_ptr<MemoryPool> memory_pool_ = GetDefaultPool();
+    std::shared_ptr<Executor> executor_ = CreateDefaultExecutor();
+    std::string temp_directory_;
+    std::map<std::string, std::string> fs_scheme_to_identifier_map_;
+    std::shared_ptr<FileSystem> specific_file_system_;
+    std::map<std::string, std::string> options_;
+};
+
+WriteContextBuilder::WriteContextBuilder(const std::string& root_path,
+                                         const std::string& commit_user)
+    : impl_(std::make_unique<Impl>()) {
+    impl_->root_path_ = root_path;
+    impl_->commit_user_ = commit_user;
+}
+
+WriteContextBuilder::~WriteContextBuilder() = default;
+
+WriteContextBuilder& WriteContextBuilder::AddOption(const std::string& key,
+                                                    const std::string& value) {
+    impl_->options_[key] = value;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::SetOptions(
+    const std::map<std::string, std::string>& opts) {
+    impl_->options_ = opts;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::WithStreamingMode(bool 
is_streaming_mode) {
+    impl_->is_streaming_mode_ = is_streaming_mode;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::WithIgnoreNumBucketCheck(bool 
ignore_num_bucket_check) {
+    impl_->ignore_num_bucket_check_ = ignore_num_bucket_check;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::WithMemoryPool(
+    const std::shared_ptr<MemoryPool>& memory_pool) {
+    impl_->memory_pool_ = memory_pool;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::WithIgnorePreviousFiles(bool 
ignore_previous_files) {
+    impl_->ignore_previous_files_ = ignore_previous_files;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::WithExecutor(const 
std::shared_ptr<Executor>& executor) {
+    impl_->executor_ = executor;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::WithTempDirectory(const std::string& 
temp_dir) {
+    impl_->temp_directory_ = temp_dir;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::WithWriteId(int32_t write_id) {
+    impl_->write_id_ = write_id;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::WithBranch(const std::string& 
branch) {
+    impl_->branch_ = branch;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::WithWriteSchema(
+    const std::vector<std::string>& write_schema) {
+    impl_->write_schema_ = write_schema;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::WithFileSystemSchemeToIdentifierMap(
+    const std::map<std::string, std::string>& fs_scheme_to_identifier_map) {
+    impl_->fs_scheme_to_identifier_map_ = fs_scheme_to_identifier_map;
+    return *this;
+}
+
+WriteContextBuilder& 
WriteContextBuilder::SetWriteBufferSpillThreadNumber(int32_t thread_number) {
+    impl_->spill_thread_number_ = thread_number;
+    return *this;
+}
+
+WriteContextBuilder& WriteContextBuilder::WithFileSystem(
+    const std::shared_ptr<FileSystem>& file_system) {
+    impl_->specific_file_system_ = file_system;
+    return *this;
+}
+
+Result<std::unique_ptr<WriteContext>> WriteContextBuilder::Finish() {
+    PAIMON_ASSIGN_OR_RAISE(impl_->root_path_, 
PathUtil::NormalizePath(impl_->root_path_));
+    if (impl_->root_path_.empty()) {
+        return Status::Invalid("root path is empty");
+    }
+    bool enable_multi_thread_spill = impl_->spill_thread_number_ > 0;

Review Comment:
   Finish() documents that WithFileSystemSchemeToIdentifierMap() cannot be used 
together with WithFileSystem(), but the builder does not enforce this. This can 
lead to ambiguous/incorrect filesystem selection at runtime.



##########
src/paimon/core/operation/write_restore.cpp:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "paimon/core/operation/write_restore.h"
+
+#include <memory>
+#include <optional>
+#include <vector>
+
+namespace paimon {
+
+Result<std::optional<int32_t>> WriteRestore::ExtractDataFiles(
+    const std::vector<ManifestEntry>& entries,
+    std::vector<std::shared_ptr<DataFileMeta>>* data_files) {
+    std::optional<int32_t> total_buckets;
+    for (const auto& entry : entries) {
+        if (total_buckets.has_value() && total_buckets.value() != 
entry.TotalBuckets()) {
+            return Status::Invalid(fmt::format(
+                "Bucket data files has different total bucket number, {} vs 
{}, this should "
+                "be a bug.",
+                total_buckets.value(), entry.TotalBuckets()));

Review Comment:
   The error message grammar is incorrect ("has" with plural "files"). Keeping 
messages polished helps with debugging and log search.



##########
src/paimon/core/operation/restore_files.h:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <vector>
+
+#include "paimon/core/index/index_file_meta.h"
+#include "paimon/core/io/data_file_meta.h"
+#include "paimon/core/snapshot.h"
+
+namespace paimon {
+
+/// Restored files with snapshot and total buckets.
+class RestoreFiles {
+ public:
+    RestoreFiles() = default;
+
+    RestoreFiles(const std::optional<Snapshot>& snapshot,
+                 const std::optional<int32_t>& total_buckets,
+                 const std::vector<std::shared_ptr<DataFileMeta>>& data_files,
+                 const std::shared_ptr<IndexFileMeta>& dynamic_bucket_index,
+                 const std::vector<std::shared_ptr<IndexFileMeta>>& 
delete_vectors_index)
+        : snapshot_(snapshot),
+          total_buckets_(total_buckets),
+          data_files_(data_files),
+          dynamic_bucket_index_(dynamic_bucket_index),
+          delete_vectors_index_(delete_vectors_index) {}
+
+    std::optional<Snapshot> GetSnapshot() const {
+        return snapshot_;
+    }
+    std::optional<int32_t> TotalBuckets() const {
+        return total_buckets_;
+    }
+    std::vector<std::shared_ptr<DataFileMeta>> DataFiles() const {
+        return data_files_;
+    }
+    std::shared_ptr<IndexFileMeta> DynamicBucketIndex() const {
+        return dynamic_bucket_index_;
+    }
+    std::vector<std::shared_ptr<IndexFileMeta>> DeleteVectorsIndex() const {
+        return delete_vectors_index_;
+    }

Review Comment:
   RestoreFiles getters return large containers by value (copying vectors on 
every access). This is on the write/restore path and can add avoidable 
overhead; returning const references avoids repeated allocations/copies.



##########
src/paimon/core/operation/file_system_write_restore.h:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#pragma once
+
+#include <limits>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "paimon/core/core_options.h"
+#include "paimon/core/index/index_file_handler.h"
+#include "paimon/core/operation/file_store_scan.h"
+#include "paimon/core/operation/restore_files.h"
+#include "paimon/core/operation/write_restore.h"
+#include "paimon/core/utils/snapshot_manager.h"
+
+namespace paimon {
+
+/// `WriteRestore` to restore files directly from file system.
+class FileSystemWriteRestore : public WriteRestore {
+ public:
+    FileSystemWriteRestore(const std::shared_ptr<SnapshotManager>& 
snapshot_manager,
+                           std::unique_ptr<FileStoreScan>&& scan,
+                           const std::shared_ptr<IndexFileHandler>& 
index_file_handler)
+        : snapshot_manager_(snapshot_manager),
+          scan_(std::move(scan)),
+          index_file_handler_(index_file_handler) {}
+
+    Result<int64_t> LatestCommittedIdentifier(const std::string& user) const 
override {
+        // TODO(yonghao.fyh): in java paimon is 
LatestSnapshotOfUserFromFileSystem
+        PAIMON_ASSIGN_OR_RAISE(std::optional<Snapshot> latest_snapshot,
+                               snapshot_manager_->LatestSnapshotOfUser(user));
+        if (latest_snapshot) {
+            return latest_snapshot.value().CommitIdentifier();
+        }
+        return std::numeric_limits<int64_t>::min();
+    }
+
+    Result<std::shared_ptr<RestoreFiles>> GetRestoreFiles(
+        const BinaryRow& partition, int32_t bucket,
+        bool scan_deletion_vectors_index) const override {
+        // TODO(yonghao.fyh): java paimon doesn't use 
snapshot_manager.LatestSnapshot() here,
+        // because they don't want to flood the catalog with high concurrency
+        PAIMON_ASSIGN_OR_RAISE(std::optional<Snapshot> snapshot,
+                               snapshot_manager_->LatestSnapshot());
+        if (snapshot == std::nullopt) {
+            return RestoreFiles::Empty();
+        }
+
+        PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileStoreScan::RawPlan> plan,
+                               
scan_->WithSnapshot(snapshot.value())->CreatePlan());
+        std::vector<ManifestEntry> entries = plan->Files();
+        std::vector<std::shared_ptr<DataFileMeta>> restore_data_files;
+        PAIMON_ASSIGN_OR_RAISE(std::optional<int32_t> total_buckets,
+                               WriteRestore::ExtractDataFiles(entries, 
&restore_data_files));
+
+        std::vector<std::shared_ptr<IndexFileMeta>> deletion_vectors_index;
+        if (scan_deletion_vectors_index) {
+            PAIMON_ASSIGN_OR_RAISE(
+                deletion_vectors_index,
+                index_file_handler_->Scan(
+                    snapshot.value(), 
std::string(DeletionVectorsIndexFile::DELETION_VECTORS_INDEX),
+                    partition, bucket));
+        }

Review Comment:
   When scan_deletion_vectors_index is true, index_file_handler_ is 
dereferenced without a null check. This can crash if the handler wasn't 
provided (e.g., DV disabled or constructed with nullptr).



##########
src/paimon/core/operation/write_restore.cpp:
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "paimon/core/operation/write_restore.h"
+
+#include <memory>
+#include <optional>
+#include <vector>
+
+namespace paimon {
+
+Result<std::optional<int32_t>> WriteRestore::ExtractDataFiles(
+    const std::vector<ManifestEntry>& entries,
+    std::vector<std::shared_ptr<DataFileMeta>>* data_files) {
+    std::optional<int32_t> total_buckets;
+    for (const auto& entry : entries) {

Review Comment:
   ExtractDataFiles() does not validate the output pointer and will crash if 
data_files is null. It also appends to whatever content is already in 
*data_files, which is surprising for a function named “Extract…”.



##########
src/paimon/core/operation/file_store_write.cpp:
##########
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include "paimon/file_store_write.h"
+
+#include <map>
+#include <string>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/types/data_field.h"
+#include "paimon/common/utils/fields_comparator.h"
+#include "paimon/core/core_options.h"
+#include "paimon/core/disk/io_manager.h"
+#include "paimon/core/manifest/index_manifest_file.h"
+#include "paimon/core/mergetree/compact/lookup_merge_function.h"
+#include "paimon/core/mergetree/compact/merge_function.h"
+#include "paimon/core/mergetree/compact/reducer_merge_function_wrapper.h"
+#include "paimon/core/operation/append_only_file_store_write.h"
+#include "paimon/core/operation/key_value_file_store_write.h"
+#include "paimon/core/options/merge_engine.h"
+#include "paimon/core/postpone/postpone_bucket_file_store_write.h"
+#include "paimon/core/schema/schema_manager.h"
+#include "paimon/core/schema/table_schema.h"
+#include "paimon/core/table/bucket_mode.h"
+#include "paimon/core/utils/field_mapping.h"
+#include "paimon/core/utils/file_store_path_factory.h"
+#include "paimon/core/utils/primary_key_table_utils.h"
+#include "paimon/core/utils/snapshot_manager.h"
+#include "paimon/format/file_format.h"
+#include "paimon/result.h"
+#include "paimon/write_context.h"
+
+namespace arrow {
+class Schema;
+}  // namespace arrow
+
+namespace paimon {
+struct KeyValue;
+template <typename T>
+class MergeFunctionWrapper;
+
+Result<std::unique_ptr<FileStoreWrite>> 
FileStoreWrite::Create(std::unique_ptr<WriteContext> ctx) {
+    if (ctx == nullptr) {
+        return Status::Invalid("write context is null pointer");
+    }
+    if (ctx->GetMemoryPool() == nullptr) {
+        return Status::Invalid("memory pool is null pointer");
+    }
+    if (ctx->GetExecutor() == nullptr) {
+        return Status::Invalid("executor is null pointer");
+    }
+
+    PAIMON_ASSIGN_OR_RAISE(CoreOptions tmp_options,
+                           CoreOptions::FromMap(ctx->GetOptions(), 
ctx->GetSpecificFileSystem(),
+                                                
ctx->GetFileSystemSchemeToIdentifierMap()));
+    std::string branch = ctx->GetBranch();
+    auto schema_manager =
+        std::make_shared<SchemaManager>(tmp_options.GetFileSystem(), 
ctx->GetRootPath(), branch);
+    PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> 
table_schema,
+                           schema_manager->Latest());
+    if (table_schema == std::nullopt) {
+        return Status::Invalid(fmt::format("cannot found latest schema in 
branch {}", branch));
+    }

Review Comment:
   Error message grammar: "cannot found" should be "cannot find".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to