github-actions[bot] commented on code in PR #25187:
URL: https://github.com/apache/doris/pull/25187#discussion_r1366424991


##########
be/src/cloud/config.h:
##########
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/config.h"
+
+namespace doris {
+namespace config {

Review Comment:
   warning: nested namespaces can be concatenated 
[modernize-concat-nested-namespaces]
   
   ```suggestion
   namespace doris::config {
   ```
   
   be/src/cloud/config.h:26:
   ```diff
   - } // namespace config
   - } // namespace doris
   + } // namespace doris
   ```
   



##########
be/src/io/fs/s3_file_bufferpool.cpp:
##########
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "s3_file_bufferpool.h"
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "io/cache/block/block_file_segment.h"
+#include "io/fs/s3_common.h"
+#include "runtime/exec_env.h"
+#include "util/defer_op.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace io {

Review Comment:
   warning: nested namespaces can be concatenated 
[modernize-concat-nested-namespaces]
   
   ```suggestion
   namespace doris::io {
   ```
   
   be/src/io/fs/s3_file_bufferpool.cpp:334:
   ```diff
   - } // namespace io
   - } // namespace doris
   + } // namespace doris
   ```
   



##########
be/src/io/fs/s3_file_bufferpool.cpp:
##########
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "s3_file_bufferpool.h"
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "io/cache/block/block_file_segment.h"
+#include "io/fs/s3_common.h"
+#include "runtime/exec_env.h"
+#include "util/defer_op.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace io {
+
+/**
+ * 0. check if the inner memory buffer is empty or not
+ * 1. relcaim the memory buffer if it's mot empty
+ */
+void FileBuffer::on_finish() {
+    if (_buffer.empty()) {
+        return;
+    }
+    S3FileBufferPool::GetInstance()->reclaim(Slice {_buffer.get_data(), 
_capacity});
+    _buffer.clear();
+}
+
+/**
+ * take other buffer's memory space and refresh capacity
+ */
+void FileBuffer::swap_buffer(Slice& other) {
+    _buffer = other;
+    _capacity = _buffer.get_size();
+    other.clear();
+}
+
+FileBuffer::FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, 
size_t offset,
+                       OperationState state, bool reserve)
+        : _alloc_holder(std::move(alloc_holder)),
+          _buffer(S3FileBufferPool::GetInstance()->allocate(reserve)),
+          _offset(offset),
+          _size(0),
+          _state(std::move(state)),
+          _capacity(_buffer.get_size()) {}
+
+/**
+ * 0. check if file cache holder allocated
+ * 1. update the cache's type to index cache
+ */
+void UploadFileBuffer::set_index_offset(size_t offset) {
+    _index_offset = offset;
+    if (_holder) {
+        bool change_to_index_cache = false;
+        for (auto iter = _holder->file_segments.begin(); iter != 
_holder->file_segments.end();
+             ++iter) {
+            if (iter == _cur_file_segment) {
+                change_to_index_cache = true;
+            }
+            if (change_to_index_cache) {
+                
static_cast<void>((*iter)->change_cache_type_self(CacheType::INDEX));
+            }
+        }
+    }
+}
+
+/**
+ * 0. when there is memory preserved, directly write data to buf
+ * 1. write to file cache otherwise, then we'll wait for free buffer and to 
rob it
+ */
+Status UploadFileBuffer::append_data(const Slice& data) {
+    Defer defer {[&] { _size += data.get_size(); }};
+    while (true) {
+        // if buf is not empty, it means there is memory preserved for this buf
+        if (!_buffer.empty()) {
+            std::memcpy((void*)(_buffer.get_data() + _size), data.get_data(), 
data.get_size());
+            break;
+        }
+        // if the buf has no memory reserved, then write to disk first
+        if (!_is_cache_allocated && config::enable_file_cache && _alloc_holder 
!= nullptr) {
+            _holder = _alloc_holder();
+            bool cache_is_not_enough = false;
+            for (auto& segment : _holder->file_segments) {
+                DCHECK(segment->state() == FileBlock::State::SKIP_CACHE ||
+                       segment->state() == FileBlock::State::EMPTY);
+                if (segment->state() == FileBlock::State::SKIP_CACHE) 
[[unlikely]] {
+                    cache_is_not_enough = true;
+                    break;
+                }
+                if (_index_offset != 0) {
+                    
static_cast<void>(segment->change_cache_type_self(CacheType::INDEX));
+                }
+            }
+            // if cache_is_not_enough, cannot use it !
+            _cur_file_segment = _holder->file_segments.begin();
+            _append_offset = (*_cur_file_segment)->range().left;
+            _holder = cache_is_not_enough ? nullptr : std::move(_holder);
+            if (_holder) {
+                (*_cur_file_segment)->get_or_set_downloader();
+            }
+            _is_cache_allocated = true;
+        }
+        if (_holder) [[likely]] {
+            size_t data_remain_size = data.get_size();
+            size_t pos = 0;
+            while (data_remain_size != 0) {
+                auto range = (*_cur_file_segment)->range();
+                size_t segment_remain_size = range.right - _append_offset + 1;
+                size_t append_size = std::min(data_remain_size, 
segment_remain_size);
+                Slice append_data(data.get_data() + pos, append_size);
+                // When there is no available free memory buffer, the data 
will be written to the cache first
+                // and then uploaded to S3 when there is an available free 
memory buffer.
+                // However, if an error occurs during the write process to the 
local cache,
+                // continuing to upload the dirty data from the cache to S3 
will result in erroneous data(Bad segment).
+                // Considering that local disk write failures are rare, a 
simple approach is chosen here,
+                // which is to treat the import as a failure directly when a 
local write failure occurs
+                RETURN_IF_ERROR((*_cur_file_segment)->append(append_data));
+                if (segment_remain_size == append_size) {
+                    RETURN_IF_ERROR((*_cur_file_segment)->finalize_write());
+                    if (++_cur_file_segment != _holder->file_segments.end()) {
+                        (*_cur_file_segment)->get_or_set_downloader();
+                    }
+                }
+                data_remain_size -= append_size;
+                _append_offset += append_size;
+                pos += append_size;
+            }
+            break;
+        } else {
+            // wait allocate buffer pool
+            auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+            swap_buffer(tmp);
+        }
+    }
+    return Status::OK();
+}
+
+/**
+ * 0. allocate one memory buffer
+ * 1. read the content from the cache and then write
+ * it into memory buffer
+ */
+void UploadFileBuffer::read_from_cache() {
+    auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+    swap_buffer(tmp);
+
+    DCHECK(_holder != nullptr);
+    DCHECK(_capacity >= _size);
+    size_t pos = 0;
+    for (auto& segment : _holder->file_segments) {
+        if (pos == _size) {
+            break;
+        }
+        if (auto s = segment->finalize_write(); !s.ok()) [[unlikely]] {
+            set_val(std::move(s));
+            return;
+        }
+        size_t segment_size = segment->range().size();
+        Slice s(_buffer.get_data() + pos, segment_size);
+        if (auto st = segment->read_at(s, 0); !st.ok()) [[unlikely]] {
+            set_val(std::move(st));
+            return;
+        }
+        pos += segment_size;
+    }
+
+    // the real lenght should be the buf.get_size() in this situation(consider 
it's the last part,
+    // size of it could be less than 5MB)
+    _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+}
+
+/**
+ * 0. constrcut the stream ptr if the buffer is not empty
+ * 1. submit the on_upload() callback to executor
+ */
+void UploadFileBuffer::submit() {
+    if (!_buffer.empty()) [[likely]] {
+        _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+    }
+    // If the data is written into file cache
+    if (_holder && _cur_file_segment != _holder->file_segments.end()) {
+        if (auto s = (*_cur_file_segment)->finalize_write(); !s.ok()) 
[[unlikely]] {
+            set_val(std::move(s));
+            return;
+        }
+    }
+    
static_cast<void>(S3FileBufferPool::GetInstance()->thread_pool()->submit_func(
+            [buf = this->shared_from_this(), this]() {
+                // to extend buf's lifetime
+                // (void)buf;
+                on_upload();
+            }));
+}
+
+/**
+ * write the content of the memory buffer to local file cache
+ */
+void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) {
+    if (!config::enable_file_cache || _alloc_holder == nullptr) {
+        return;
+    }
+    if (_holder) {
+        return;
+    }
+    if (is_cancelled) {
+        return;
+    }
+    // the data is already written to S3 in this situation
+    // so i didn't handle the file cache write error
+    _holder = _alloc_holder();
+    size_t pos = 0;
+    size_t data_remain_size = _size;
+    for (auto& segment : _holder->file_segments) {
+        if (data_remain_size == 0) {
+            break;
+        }
+        size_t segment_size = segment->range().size();
+        size_t append_size = std::min(data_remain_size, segment_size);
+        if (segment->state() == FileBlock::State::EMPTY) {
+            if (_index_offset != 0 && segment->range().right >= _index_offset) 
{
+                // segment->change_cache_type_self(CacheType::INDEX);
+            }
+            segment->get_or_set_downloader();
+            // Another thread may have started downloading due to a query
+            // Just skip putting to cache from UploadFileBuffer
+            if (segment->is_downloader()) {
+                Slice s(_buffer.get_data() + pos, append_size);
+                if (auto st = segment->append(s); !st.ok()) [[unlikely]] {
+                    LOG_WARNING("append data to cache segmetn failed due to 
{}", st);
+                    return;
+                }
+                if (auto st = segment->finalize_write(); !st.ok()) 
[[unlikely]] {
+                    LOG_WARNING("finalize write to cache segmetn failed due to 
{}", st);
+                    return;
+                }
+            }
+        }
+        data_remain_size -= append_size;
+        pos += append_size;
+    }
+}
+
+FileBufferBuilder& FileBufferBuilder::set_type(BufferType type) {
+    _type = type;
+    return *this;
+}
+FileBufferBuilder& FileBufferBuilder::set_upload_callback(
+        std::function<void(UploadFileBuffer& buf)> cb) {
+    _upload_cb = std::move(cb);
+    return *this;
+}
+// set callback to do task sync for the caller
+FileBufferBuilder& 
FileBufferBuilder::set_sync_after_complete_task(std::function<bool(Status)> cb) 
{
+    _sync_after_complete_task = std::move(cb);
+    return *this;
+}
+
+FileBufferBuilder& FileBufferBuilder::set_allocate_file_segments_holder(
+        std::function<FileBlocksHolderPtr()> cb) {
+    _alloc_holder_cb = std::move(cb);
+    return *this;
+}
+
+std::shared_ptr<FileBuffer> FileBufferBuilder::build() {
+    OperationState state(_sync_after_complete_task, _is_cancelled);
+    if (_type == BufferType::UPLOAD) {
+        return std::make_shared<UploadFileBuffer>(std::move(_upload_cb), 
std::move(state), _offset,
+                                                  std::move(_alloc_holder_cb), 
_index_offset);
+    }
+    // should never come here
+    return nullptr;
+}
+
+void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t 
s3_write_buffer_size,
+                            ThreadPool* thread_pool) {
+    // the nums could be one configuration
+    size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
+    DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&

Review Comment:
   warning: 1024 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
       DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
                                           ^
   ```
   



##########
be/src/io/fs/s3_file_bufferpool.cpp:
##########
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "s3_file_bufferpool.h"
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "io/cache/block/block_file_segment.h"
+#include "io/fs/s3_common.h"
+#include "runtime/exec_env.h"
+#include "util/defer_op.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace io {
+
+/**
+ * 0. check if the inner memory buffer is empty or not
+ * 1. relcaim the memory buffer if it's mot empty
+ */
+void FileBuffer::on_finish() {
+    if (_buffer.empty()) {
+        return;
+    }
+    S3FileBufferPool::GetInstance()->reclaim(Slice {_buffer.get_data(), 
_capacity});
+    _buffer.clear();
+}
+
+/**
+ * take other buffer's memory space and refresh capacity
+ */
+void FileBuffer::swap_buffer(Slice& other) {
+    _buffer = other;
+    _capacity = _buffer.get_size();
+    other.clear();
+}
+
+FileBuffer::FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, 
size_t offset,
+                       OperationState state, bool reserve)
+        : _alloc_holder(std::move(alloc_holder)),
+          _buffer(S3FileBufferPool::GetInstance()->allocate(reserve)),
+          _offset(offset),
+          _size(0),
+          _state(std::move(state)),
+          _capacity(_buffer.get_size()) {}
+
+/**
+ * 0. check if file cache holder allocated
+ * 1. update the cache's type to index cache
+ */
+void UploadFileBuffer::set_index_offset(size_t offset) {
+    _index_offset = offset;
+    if (_holder) {
+        bool change_to_index_cache = false;
+        for (auto iter = _holder->file_segments.begin(); iter != 
_holder->file_segments.end();
+             ++iter) {
+            if (iter == _cur_file_segment) {
+                change_to_index_cache = true;
+            }
+            if (change_to_index_cache) {
+                
static_cast<void>((*iter)->change_cache_type_self(CacheType::INDEX));
+            }
+        }
+    }
+}
+
+/**
+ * 0. when there is memory preserved, directly write data to buf
+ * 1. write to file cache otherwise, then we'll wait for free buffer and to 
rob it
+ */
+Status UploadFileBuffer::append_data(const Slice& data) {
+    Defer defer {[&] { _size += data.get_size(); }};
+    while (true) {
+        // if buf is not empty, it means there is memory preserved for this buf
+        if (!_buffer.empty()) {
+            std::memcpy((void*)(_buffer.get_data() + _size), data.get_data(), 
data.get_size());
+            break;
+        }
+        // if the buf has no memory reserved, then write to disk first
+        if (!_is_cache_allocated && config::enable_file_cache && _alloc_holder 
!= nullptr) {
+            _holder = _alloc_holder();
+            bool cache_is_not_enough = false;
+            for (auto& segment : _holder->file_segments) {
+                DCHECK(segment->state() == FileBlock::State::SKIP_CACHE ||
+                       segment->state() == FileBlock::State::EMPTY);
+                if (segment->state() == FileBlock::State::SKIP_CACHE) 
[[unlikely]] {
+                    cache_is_not_enough = true;
+                    break;
+                }
+                if (_index_offset != 0) {
+                    
static_cast<void>(segment->change_cache_type_self(CacheType::INDEX));
+                }
+            }
+            // if cache_is_not_enough, cannot use it !
+            _cur_file_segment = _holder->file_segments.begin();
+            _append_offset = (*_cur_file_segment)->range().left;
+            _holder = cache_is_not_enough ? nullptr : std::move(_holder);
+            if (_holder) {
+                (*_cur_file_segment)->get_or_set_downloader();
+            }
+            _is_cache_allocated = true;
+        }
+        if (_holder) [[likely]] {
+            size_t data_remain_size = data.get_size();
+            size_t pos = 0;
+            while (data_remain_size != 0) {
+                auto range = (*_cur_file_segment)->range();
+                size_t segment_remain_size = range.right - _append_offset + 1;
+                size_t append_size = std::min(data_remain_size, 
segment_remain_size);
+                Slice append_data(data.get_data() + pos, append_size);
+                // When there is no available free memory buffer, the data 
will be written to the cache first
+                // and then uploaded to S3 when there is an available free 
memory buffer.
+                // However, if an error occurs during the write process to the 
local cache,
+                // continuing to upload the dirty data from the cache to S3 
will result in erroneous data(Bad segment).
+                // Considering that local disk write failures are rare, a 
simple approach is chosen here,
+                // which is to treat the import as a failure directly when a 
local write failure occurs
+                RETURN_IF_ERROR((*_cur_file_segment)->append(append_data));
+                if (segment_remain_size == append_size) {
+                    RETURN_IF_ERROR((*_cur_file_segment)->finalize_write());
+                    if (++_cur_file_segment != _holder->file_segments.end()) {
+                        (*_cur_file_segment)->get_or_set_downloader();
+                    }
+                }
+                data_remain_size -= append_size;
+                _append_offset += append_size;
+                pos += append_size;
+            }
+            break;
+        } else {
+            // wait allocate buffer pool
+            auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+            swap_buffer(tmp);
+        }
+    }
+    return Status::OK();
+}
+
+/**
+ * 0. allocate one memory buffer
+ * 1. read the content from the cache and then write
+ * it into memory buffer
+ */
+void UploadFileBuffer::read_from_cache() {
+    auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+    swap_buffer(tmp);
+
+    DCHECK(_holder != nullptr);
+    DCHECK(_capacity >= _size);
+    size_t pos = 0;
+    for (auto& segment : _holder->file_segments) {
+        if (pos == _size) {
+            break;
+        }
+        if (auto s = segment->finalize_write(); !s.ok()) [[unlikely]] {
+            set_val(std::move(s));
+            return;
+        }
+        size_t segment_size = segment->range().size();
+        Slice s(_buffer.get_data() + pos, segment_size);
+        if (auto st = segment->read_at(s, 0); !st.ok()) [[unlikely]] {
+            set_val(std::move(st));
+            return;
+        }
+        pos += segment_size;
+    }
+
+    // the real lenght should be the buf.get_size() in this situation(consider 
it's the last part,
+    // size of it could be less than 5MB)
+    _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+}
+
+/**
+ * 0. constrcut the stream ptr if the buffer is not empty
+ * 1. submit the on_upload() callback to executor
+ */
+void UploadFileBuffer::submit() {
+    if (!_buffer.empty()) [[likely]] {
+        _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+    }
+    // If the data is written into file cache
+    if (_holder && _cur_file_segment != _holder->file_segments.end()) {
+        if (auto s = (*_cur_file_segment)->finalize_write(); !s.ok()) 
[[unlikely]] {
+            set_val(std::move(s));
+            return;
+        }
+    }
+    
static_cast<void>(S3FileBufferPool::GetInstance()->thread_pool()->submit_func(
+            [buf = this->shared_from_this(), this]() {
+                // to extend buf's lifetime
+                // (void)buf;
+                on_upload();
+            }));
+}
+
+/**
+ * write the content of the memory buffer to local file cache
+ */
+void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) {
+    if (!config::enable_file_cache || _alloc_holder == nullptr) {
+        return;
+    }
+    if (_holder) {
+        return;
+    }
+    if (is_cancelled) {
+        return;
+    }
+    // the data is already written to S3 in this situation
+    // so i didn't handle the file cache write error
+    _holder = _alloc_holder();
+    size_t pos = 0;
+    size_t data_remain_size = _size;
+    for (auto& segment : _holder->file_segments) {
+        if (data_remain_size == 0) {
+            break;
+        }
+        size_t segment_size = segment->range().size();
+        size_t append_size = std::min(data_remain_size, segment_size);
+        if (segment->state() == FileBlock::State::EMPTY) {
+            if (_index_offset != 0 && segment->range().right >= _index_offset) 
{
+                // segment->change_cache_type_self(CacheType::INDEX);
+            }
+            segment->get_or_set_downloader();
+            // Another thread may have started downloading due to a query
+            // Just skip putting to cache from UploadFileBuffer
+            if (segment->is_downloader()) {
+                Slice s(_buffer.get_data() + pos, append_size);
+                if (auto st = segment->append(s); !st.ok()) [[unlikely]] {
+                    LOG_WARNING("append data to cache segmetn failed due to 
{}", st);
+                    return;
+                }
+                if (auto st = segment->finalize_write(); !st.ok()) 
[[unlikely]] {
+                    LOG_WARNING("finalize write to cache segmetn failed due to 
{}", st);
+                    return;
+                }
+            }
+        }
+        data_remain_size -= append_size;
+        pos += append_size;
+    }
+}
+
+FileBufferBuilder& FileBufferBuilder::set_type(BufferType type) {
+    _type = type;
+    return *this;
+}
+FileBufferBuilder& FileBufferBuilder::set_upload_callback(
+        std::function<void(UploadFileBuffer& buf)> cb) {
+    _upload_cb = std::move(cb);
+    return *this;
+}
+// set callback to do task sync for the caller
+FileBufferBuilder& 
FileBufferBuilder::set_sync_after_complete_task(std::function<bool(Status)> cb) 
{
+    _sync_after_complete_task = std::move(cb);
+    return *this;
+}
+
+FileBufferBuilder& FileBufferBuilder::set_allocate_file_segments_holder(
+        std::function<FileBlocksHolderPtr()> cb) {
+    _alloc_holder_cb = std::move(cb);
+    return *this;
+}
+
+std::shared_ptr<FileBuffer> FileBufferBuilder::build() {
+    OperationState state(_sync_after_complete_task, _is_cancelled);
+    if (_type == BufferType::UPLOAD) {
+        return std::make_shared<UploadFileBuffer>(std::move(_upload_cb), 
std::move(state), _offset,
+                                                  std::move(_alloc_holder_cb), 
_index_offset);
+    }
+    // should never come here
+    return nullptr;
+}
+
+void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t 
s3_write_buffer_size,
+                            ThreadPool* thread_pool) {
+    // the nums could be one configuration
+    size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
+    DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&

Review Comment:
   warning: 1024 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
       DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
                                                  ^
   ```
   



##########
be/src/io/fs/s3_file_bufferpool.cpp:
##########
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "s3_file_bufferpool.h"
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "io/cache/block/block_file_segment.h"
+#include "io/fs/s3_common.h"
+#include "runtime/exec_env.h"
+#include "util/defer_op.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace io {
+
+/**
+ * 0. check if the inner memory buffer is empty or not
+ * 1. relcaim the memory buffer if it's mot empty
+ */
+void FileBuffer::on_finish() {
+    if (_buffer.empty()) {
+        return;
+    }
+    S3FileBufferPool::GetInstance()->reclaim(Slice {_buffer.get_data(), 
_capacity});
+    _buffer.clear();
+}
+
+/**
+ * take other buffer's memory space and refresh capacity
+ */
+void FileBuffer::swap_buffer(Slice& other) {
+    _buffer = other;
+    _capacity = _buffer.get_size();
+    other.clear();
+}
+
+FileBuffer::FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, 
size_t offset,
+                       OperationState state, bool reserve)
+        : _alloc_holder(std::move(alloc_holder)),
+          _buffer(S3FileBufferPool::GetInstance()->allocate(reserve)),
+          _offset(offset),
+          _size(0),
+          _state(std::move(state)),
+          _capacity(_buffer.get_size()) {}
+
+/**
+ * 0. check if file cache holder allocated
+ * 1. update the cache's type to index cache
+ */
+void UploadFileBuffer::set_index_offset(size_t offset) {
+    _index_offset = offset;
+    if (_holder) {
+        bool change_to_index_cache = false;
+        for (auto iter = _holder->file_segments.begin(); iter != 
_holder->file_segments.end();
+             ++iter) {
+            if (iter == _cur_file_segment) {
+                change_to_index_cache = true;
+            }
+            if (change_to_index_cache) {
+                
static_cast<void>((*iter)->change_cache_type_self(CacheType::INDEX));
+            }
+        }
+    }
+}
+
+/**
+ * 0. when there is memory preserved, directly write data to buf
+ * 1. write to file cache otherwise, then we'll wait for free buffer and to 
rob it
+ */
+Status UploadFileBuffer::append_data(const Slice& data) {
+    Defer defer {[&] { _size += data.get_size(); }};
+    while (true) {
+        // if buf is not empty, it means there is memory preserved for this buf
+        if (!_buffer.empty()) {
+            std::memcpy((void*)(_buffer.get_data() + _size), data.get_data(), 
data.get_size());
+            break;
+        }
+        // if the buf has no memory reserved, then write to disk first
+        if (!_is_cache_allocated && config::enable_file_cache && _alloc_holder 
!= nullptr) {
+            _holder = _alloc_holder();
+            bool cache_is_not_enough = false;
+            for (auto& segment : _holder->file_segments) {
+                DCHECK(segment->state() == FileBlock::State::SKIP_CACHE ||
+                       segment->state() == FileBlock::State::EMPTY);
+                if (segment->state() == FileBlock::State::SKIP_CACHE) 
[[unlikely]] {
+                    cache_is_not_enough = true;
+                    break;
+                }
+                if (_index_offset != 0) {
+                    
static_cast<void>(segment->change_cache_type_self(CacheType::INDEX));
+                }
+            }
+            // if cache_is_not_enough, cannot use it !
+            _cur_file_segment = _holder->file_segments.begin();
+            _append_offset = (*_cur_file_segment)->range().left;
+            _holder = cache_is_not_enough ? nullptr : std::move(_holder);
+            if (_holder) {
+                (*_cur_file_segment)->get_or_set_downloader();
+            }
+            _is_cache_allocated = true;
+        }
+        if (_holder) [[likely]] {
+            size_t data_remain_size = data.get_size();
+            size_t pos = 0;
+            while (data_remain_size != 0) {
+                auto range = (*_cur_file_segment)->range();
+                size_t segment_remain_size = range.right - _append_offset + 1;
+                size_t append_size = std::min(data_remain_size, 
segment_remain_size);
+                Slice append_data(data.get_data() + pos, append_size);
+                // When there is no available free memory buffer, the data 
will be written to the cache first
+                // and then uploaded to S3 when there is an available free 
memory buffer.
+                // However, if an error occurs during the write process to the 
local cache,
+                // continuing to upload the dirty data from the cache to S3 
will result in erroneous data(Bad segment).
+                // Considering that local disk write failures are rare, a 
simple approach is chosen here,
+                // which is to treat the import as a failure directly when a 
local write failure occurs
+                RETURN_IF_ERROR((*_cur_file_segment)->append(append_data));
+                if (segment_remain_size == append_size) {
+                    RETURN_IF_ERROR((*_cur_file_segment)->finalize_write());
+                    if (++_cur_file_segment != _holder->file_segments.end()) {
+                        (*_cur_file_segment)->get_or_set_downloader();
+                    }
+                }
+                data_remain_size -= append_size;
+                _append_offset += append_size;
+                pos += append_size;
+            }
+            break;
+        } else {
+            // wait allocate buffer pool
+            auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+            swap_buffer(tmp);
+        }
+    }
+    return Status::OK();
+}
+
+/**
+ * 0. allocate one memory buffer
+ * 1. read the content from the cache and then write
+ * it into memory buffer
+ */
+void UploadFileBuffer::read_from_cache() {
+    auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+    swap_buffer(tmp);
+
+    DCHECK(_holder != nullptr);
+    DCHECK(_capacity >= _size);
+    size_t pos = 0;
+    for (auto& segment : _holder->file_segments) {
+        if (pos == _size) {
+            break;
+        }
+        if (auto s = segment->finalize_write(); !s.ok()) [[unlikely]] {
+            set_val(std::move(s));
+            return;
+        }
+        size_t segment_size = segment->range().size();
+        Slice s(_buffer.get_data() + pos, segment_size);
+        if (auto st = segment->read_at(s, 0); !st.ok()) [[unlikely]] {
+            set_val(std::move(st));
+            return;
+        }
+        pos += segment_size;
+    }
+
+    // the real lenght should be the buf.get_size() in this situation(consider 
it's the last part,
+    // size of it could be less than 5MB)
+    _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+}
+
+/**
+ * 0. constrcut the stream ptr if the buffer is not empty
+ * 1. submit the on_upload() callback to executor
+ */
+void UploadFileBuffer::submit() {
+    if (!_buffer.empty()) [[likely]] {
+        _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+    }
+    // If the data is written into file cache
+    if (_holder && _cur_file_segment != _holder->file_segments.end()) {
+        if (auto s = (*_cur_file_segment)->finalize_write(); !s.ok()) 
[[unlikely]] {
+            set_val(std::move(s));
+            return;
+        }
+    }
+    
static_cast<void>(S3FileBufferPool::GetInstance()->thread_pool()->submit_func(
+            [buf = this->shared_from_this(), this]() {
+                // to extend buf's lifetime
+                // (void)buf;
+                on_upload();
+            }));
+}
+
+/**
+ * write the content of the memory buffer to local file cache
+ */
+void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) {
+    if (!config::enable_file_cache || _alloc_holder == nullptr) {
+        return;
+    }
+    if (_holder) {
+        return;
+    }
+    if (is_cancelled) {
+        return;
+    }
+    // the data is already written to S3 in this situation
+    // so i didn't handle the file cache write error
+    _holder = _alloc_holder();
+    size_t pos = 0;
+    size_t data_remain_size = _size;
+    for (auto& segment : _holder->file_segments) {
+        if (data_remain_size == 0) {
+            break;
+        }
+        size_t segment_size = segment->range().size();
+        size_t append_size = std::min(data_remain_size, segment_size);
+        if (segment->state() == FileBlock::State::EMPTY) {
+            if (_index_offset != 0 && segment->range().right >= _index_offset) 
{
+                // segment->change_cache_type_self(CacheType::INDEX);
+            }
+            segment->get_or_set_downloader();
+            // Another thread may have started downloading due to a query
+            // Just skip putting to cache from UploadFileBuffer
+            if (segment->is_downloader()) {
+                Slice s(_buffer.get_data() + pos, append_size);
+                if (auto st = segment->append(s); !st.ok()) [[unlikely]] {
+                    LOG_WARNING("append data to cache segmetn failed due to 
{}", st);
+                    return;
+                }
+                if (auto st = segment->finalize_write(); !st.ok()) 
[[unlikely]] {
+                    LOG_WARNING("finalize write to cache segmetn failed due to 
{}", st);
+                    return;
+                }
+            }
+        }
+        data_remain_size -= append_size;
+        pos += append_size;
+    }
+}
+
+FileBufferBuilder& FileBufferBuilder::set_type(BufferType type) {
+    _type = type;
+    return *this;
+}
+FileBufferBuilder& FileBufferBuilder::set_upload_callback(
+        std::function<void(UploadFileBuffer& buf)> cb) {
+    _upload_cb = std::move(cb);
+    return *this;
+}
+// set callback to do task sync for the caller
+FileBufferBuilder& 
FileBufferBuilder::set_sync_after_complete_task(std::function<bool(Status)> cb) 
{
+    _sync_after_complete_task = std::move(cb);
+    return *this;
+}
+
+FileBufferBuilder& FileBufferBuilder::set_allocate_file_segments_holder(
+        std::function<FileBlocksHolderPtr()> cb) {
+    _alloc_holder_cb = std::move(cb);
+    return *this;
+}
+
+std::shared_ptr<FileBuffer> FileBufferBuilder::build() {
+    OperationState state(_sync_after_complete_task, _is_cancelled);
+    if (_type == BufferType::UPLOAD) {
+        return std::make_shared<UploadFileBuffer>(std::move(_upload_cb), 
std::move(state), _offset,
+                                                  std::move(_alloc_holder_cb), 
_index_offset);
+    }
+    // should never come here
+    return nullptr;
+}
+
+void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t 
s3_write_buffer_size,
+                            ThreadPool* thread_pool) {
+    // the nums could be one configuration
+    size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
+    DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&

Review Comment:
   warning: 5 is a magic number; consider replacing it with a named constant 
[readability-magic-numbers]
   ```cpp
       DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
                                       ^
   ```
   



##########
be/src/olap/data_dir.cpp:
##########
@@ -685,12 +686,10 @@ void DataDir::perform_path_gc_by_tablet() {
     LOG(INFO) << "finished one time path gc by tablet.";
 }
 
-void DataDir::perform_path_gc_by_rowsetid() {
+void DataDir::_perform_path_gc_by_rowsetid() {

Review Comment:
   warning: method '_perform_path_gc_by_rowsetid' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/data_dir.h:166:
   ```diff
   -     void _perform_path_gc_by_rowsetid();
   +     static void _perform_path_gc_by_rowsetid();
   ```
   



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/rowset_meta.h"
+
+#include "common/logging.h"
+#include "google/protobuf/util/message_differencer.h"
+#include "io/fs/local_file_system.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/pb_to_json.h"
+#include "olap/olap_common.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+RowsetMeta::~RowsetMeta() = default;
+
+bool RowsetMeta::init(const std::string& pb_rowset_meta) {
+    bool ret = _deserialize_from_pb(pb_rowset_meta);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+    }
+    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set 
it back to `rowset_meta_pb`,
+    // this won't break const semantics of `rowset_meta_pb`, because 
`rowset_meta_pb` is not changed
+    // before and after call this method.
+    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
+    auto schema = mut_rowset_meta_pb.release_tablet_schema();
+    _rowset_meta_pb = mut_rowset_meta_pb;
+    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
+    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
+    json2pb::Pb2JsonOptions json_options;
+    json_options.pretty_json = true;
+    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, 
json_options);
+    return ret;
+}
+
+const io::FileSystemSPtr& RowsetMeta::fs() {
+    if (!_fs) {
+        if (is_local()) {
+            _fs = io::global_local_filesystem();
+        } else {
+            _fs = get_filesystem(resource_id());
+            LOG_IF(WARNING, !_fs) << "Cannot get file system: " << 
resource_id();
+        }
+    }
+    return _fs;
+}
+
+void RowsetMeta::set_fs(io::FileSystemSPtr fs) {
+    if (fs && fs->type() != io::FileSystemType::LOCAL) {
+        _rowset_meta_pb.set_resource_id(fs->id());
+    }
+    _fs = std::move(fs);
+}
+
+void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) const {
+    *rs_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
+    }
+}
+
+RowsetMetaPB RowsetMeta::get_rowset_pb() {
+    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
+    }
+    return rowset_meta_pb;
+}
+
+void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
+    _schema = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
+}
+
+bool RowsetMeta::_deserialize_from_pb(const std::string& value) {

Review Comment:
   warning: method '_deserialize_from_pb' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static bool RowsetMeta::_deserialize_from_pb(const std::string& value) {
   ```
   



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/rowset_meta.h"
+
+#include "common/logging.h"
+#include "google/protobuf/util/message_differencer.h"
+#include "io/fs/local_file_system.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/pb_to_json.h"
+#include "olap/olap_common.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+RowsetMeta::~RowsetMeta() = default;
+
+bool RowsetMeta::init(const std::string& pb_rowset_meta) {
+    bool ret = _deserialize_from_pb(pb_rowset_meta);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+    }
+    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set 
it back to `rowset_meta_pb`,
+    // this won't break const semantics of `rowset_meta_pb`, because 
`rowset_meta_pb` is not changed
+    // before and after call this method.
+    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
+    auto schema = mut_rowset_meta_pb.release_tablet_schema();
+    _rowset_meta_pb = mut_rowset_meta_pb;
+    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
+    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
+    json2pb::Pb2JsonOptions json_options;
+    json_options.pretty_json = true;
+    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, 
json_options);
+    return ret;
+}
+
+const io::FileSystemSPtr& RowsetMeta::fs() {
+    if (!_fs) {
+        if (is_local()) {
+            _fs = io::global_local_filesystem();
+        } else {
+            _fs = get_filesystem(resource_id());
+            LOG_IF(WARNING, !_fs) << "Cannot get file system: " << 
resource_id();
+        }
+    }
+    return _fs;
+}
+
+void RowsetMeta::set_fs(io::FileSystemSPtr fs) {
+    if (fs && fs->type() != io::FileSystemType::LOCAL) {
+        _rowset_meta_pb.set_resource_id(fs->id());
+    }
+    _fs = std::move(fs);
+}
+
+void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) const {
+    *rs_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
+    }
+}
+
+RowsetMetaPB RowsetMeta::get_rowset_pb() {
+    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
+    }
+    return rowset_meta_pb;
+}
+
+void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
+    _schema = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
+}
+
+bool RowsetMeta::_deserialize_from_pb(const std::string& value) {
+    RowsetMetaPB rowset_meta_pb;
+    if (!rowset_meta_pb.ParseFromString(value)) {
+        return false;
+    }
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+        rowset_meta_pb.clear_tablet_schema();
+    }
+    _rowset_meta_pb = rowset_meta_pb;
+    return true;
+}
+
+bool RowsetMeta::_serialize_to_pb(std::string* value) {

Review Comment:
   warning: method '_serialize_to_pb' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static bool RowsetMeta::_serialize_to_pb(std::string* value) {
   ```
   



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/rowset_meta.h"
+
+#include "common/logging.h"
+#include "google/protobuf/util/message_differencer.h"
+#include "io/fs/local_file_system.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/pb_to_json.h"
+#include "olap/olap_common.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+RowsetMeta::~RowsetMeta() = default;
+
+bool RowsetMeta::init(const std::string& pb_rowset_meta) {
+    bool ret = _deserialize_from_pb(pb_rowset_meta);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+    }
+    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set 
it back to `rowset_meta_pb`,
+    // this won't break const semantics of `rowset_meta_pb`, because 
`rowset_meta_pb` is not changed
+    // before and after call this method.
+    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
+    auto schema = mut_rowset_meta_pb.release_tablet_schema();
+    _rowset_meta_pb = mut_rowset_meta_pb;
+    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
+    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
+    json2pb::Pb2JsonOptions json_options;
+    json_options.pretty_json = true;
+    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, 
json_options);
+    return ret;
+}
+
+const io::FileSystemSPtr& RowsetMeta::fs() {
+    if (!_fs) {
+        if (is_local()) {
+            _fs = io::global_local_filesystem();
+        } else {
+            _fs = get_filesystem(resource_id());
+            LOG_IF(WARNING, !_fs) << "Cannot get file system: " << 
resource_id();
+        }
+    }
+    return _fs;
+}
+
+void RowsetMeta::set_fs(io::FileSystemSPtr fs) {
+    if (fs && fs->type() != io::FileSystemType::LOCAL) {
+        _rowset_meta_pb.set_resource_id(fs->id());
+    }
+    _fs = std::move(fs);
+}
+
+void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) const {
+    *rs_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
+    }
+}
+
+RowsetMetaPB RowsetMeta::get_rowset_pb() {
+    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
+    }
+    return rowset_meta_pb;
+}
+
+void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
+    _schema = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
+}
+
+bool RowsetMeta::_deserialize_from_pb(const std::string& value) {
+    RowsetMetaPB rowset_meta_pb;
+    if (!rowset_meta_pb.ParseFromString(value)) {
+        return false;
+    }
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+        rowset_meta_pb.clear_tablet_schema();
+    }
+    _rowset_meta_pb = rowset_meta_pb;
+    return true;
+}
+
+bool RowsetMeta::_serialize_to_pb(std::string* value) {
+    if (value == nullptr) {
+        return false;
+    }
+    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
+    }
+    return rowset_meta_pb.SerializeToString(value);
+}
+
+void RowsetMeta::_init() {
+    if (_rowset_meta_pb.rowset_id() > 0) {
+        _rowset_id.init(_rowset_meta_pb.rowset_id());
+    } else {
+        _rowset_id.init(_rowset_meta_pb.rowset_id_v2());
+    }
+}
+
+bool operator==(const RowsetMeta& a, const RowsetMeta& b) {
+    if (a._rowset_id != b._rowset_id) return false;
+    if (a._is_removed_from_rowset_meta != b._is_removed_from_rowset_meta) 
return false;
+    if (!google::protobuf::util::MessageDifferencer::Equals(a._rowset_meta_pb, 
b._rowset_meta_pb))
+        return false;

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
       if 
(!google::protobuf::util::MessageDifferencer::Equals(a._rowset_meta_pb, 
b._rowset_meta_pb)) {
           return false;
   }
   ```
   



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/rowset_meta.h"
+
+#include "common/logging.h"
+#include "google/protobuf/util/message_differencer.h"
+#include "io/fs/local_file_system.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/pb_to_json.h"
+#include "olap/olap_common.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+RowsetMeta::~RowsetMeta() = default;
+
+bool RowsetMeta::init(const std::string& pb_rowset_meta) {
+    bool ret = _deserialize_from_pb(pb_rowset_meta);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+    }
+    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set 
it back to `rowset_meta_pb`,
+    // this won't break const semantics of `rowset_meta_pb`, because 
`rowset_meta_pb` is not changed
+    // before and after call this method.
+    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
+    auto schema = mut_rowset_meta_pb.release_tablet_schema();
+    _rowset_meta_pb = mut_rowset_meta_pb;
+    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
+    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
+    json2pb::Pb2JsonOptions json_options;
+    json_options.pretty_json = true;
+    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, 
json_options);
+    return ret;
+}
+
+const io::FileSystemSPtr& RowsetMeta::fs() {
+    if (!_fs) {
+        if (is_local()) {
+            _fs = io::global_local_filesystem();
+        } else {
+            _fs = get_filesystem(resource_id());
+            LOG_IF(WARNING, !_fs) << "Cannot get file system: " << 
resource_id();
+        }
+    }
+    return _fs;
+}
+
+void RowsetMeta::set_fs(io::FileSystemSPtr fs) {
+    if (fs && fs->type() != io::FileSystemType::LOCAL) {
+        _rowset_meta_pb.set_resource_id(fs->id());
+    }
+    _fs = std::move(fs);
+}
+
+void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) const {

Review Comment:
   warning: method 'to_rowset_pb' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) {
   ```
   



##########
be/src/cloud/config.cpp:
##########
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/config.h"
+
+namespace doris {
+namespace config {

Review Comment:
   warning: nested namespaces can be concatenated 
[modernize-concat-nested-namespaces]
   
   ```suggestion
   namespace doris::config {
   ```
   
   be/src/cloud/config.cpp:24:
   ```diff
   - } // namespace config
   - } // namespace doris
   + } // namespace doris
   ```
   



##########
be/src/olap/rowset/rowset_meta.h:
##########
@@ -328,20 +260,20 @@ class RowsetMeta {
 
     auto& get_segments_key_bounds() { return 
_rowset_meta_pb.segments_key_bounds(); }
 
-    virtual bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
+    bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {

Review Comment:
   warning: method 'get_first_segment_key_bound' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
   ```
   



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/rowset_meta.h"
+
+#include "common/logging.h"
+#include "google/protobuf/util/message_differencer.h"
+#include "io/fs/local_file_system.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/pb_to_json.h"
+#include "olap/olap_common.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+RowsetMeta::~RowsetMeta() = default;
+
+bool RowsetMeta::init(const std::string& pb_rowset_meta) {
+    bool ret = _deserialize_from_pb(pb_rowset_meta);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+    }
+    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set 
it back to `rowset_meta_pb`,
+    // this won't break const semantics of `rowset_meta_pb`, because 
`rowset_meta_pb` is not changed
+    // before and after call this method.
+    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
+    auto schema = mut_rowset_meta_pb.release_tablet_schema();
+    _rowset_meta_pb = mut_rowset_meta_pb;
+    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
+    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
+    json2pb::Pb2JsonOptions json_options;
+    json_options.pretty_json = true;
+    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, 
json_options);
+    return ret;
+}
+
+const io::FileSystemSPtr& RowsetMeta::fs() {
+    if (!_fs) {
+        if (is_local()) {
+            _fs = io::global_local_filesystem();
+        } else {
+            _fs = get_filesystem(resource_id());
+            LOG_IF(WARNING, !_fs) << "Cannot get file system: " << 
resource_id();
+        }
+    }
+    return _fs;
+}
+
+void RowsetMeta::set_fs(io::FileSystemSPtr fs) {
+    if (fs && fs->type() != io::FileSystemType::LOCAL) {
+        _rowset_meta_pb.set_resource_id(fs->id());
+    }
+    _fs = std::move(fs);
+}
+
+void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) const {
+    *rs_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
+    }
+}
+
+RowsetMetaPB RowsetMeta::get_rowset_pb() {
+    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
+    }
+    return rowset_meta_pb;
+}
+
+void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
+    _schema = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
+}
+
+bool RowsetMeta::_deserialize_from_pb(const std::string& value) {
+    RowsetMetaPB rowset_meta_pb;
+    if (!rowset_meta_pb.ParseFromString(value)) {
+        return false;
+    }
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+        rowset_meta_pb.clear_tablet_schema();
+    }
+    _rowset_meta_pb = rowset_meta_pb;
+    return true;
+}
+
+bool RowsetMeta::_serialize_to_pb(std::string* value) {
+    if (value == nullptr) {
+        return false;
+    }
+    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
+    }
+    return rowset_meta_pb.SerializeToString(value);
+}
+
+void RowsetMeta::_init() {
+    if (_rowset_meta_pb.rowset_id() > 0) {
+        _rowset_id.init(_rowset_meta_pb.rowset_id());
+    } else {
+        _rowset_id.init(_rowset_meta_pb.rowset_id_v2());
+    }
+}
+
+bool operator==(const RowsetMeta& a, const RowsetMeta& b) {
+    if (a._rowset_id != b._rowset_id) return false;

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
       if (a._rowset_id != b._rowset_id) { return false;
   }
   ```
   



##########
be/src/olap/base_tablet.cpp:
##########
@@ -65,26 +53,16 @@ Status BaseTablet::set_tablet_state(TabletState state) {
                 "could not change tablet state from shutdown to {}", state);
     }
     _tablet_meta->set_tablet_state(state);
-    _state = state;
     return Status::OK();
 }
 
-void BaseTablet::_gen_tablet_path() {
-    if (_data_dir != nullptr && _tablet_meta != nullptr) {
-        _tablet_path = fmt::format("{}/{}/{}/{}/{}", _data_dir->path(), 
DATA_PREFIX, shard_id(),
-                                   tablet_id(), schema_hash());
-    }
-}
-
-bool BaseTablet::set_tablet_schema_into_rowset_meta() {
-    bool flag = false;
-    for (RowsetMetaSharedPtr rowset_meta : 
_tablet_meta->all_mutable_rs_metas()) {
-        if (!rowset_meta->tablet_schema()) {
-            rowset_meta->set_tablet_schema(_schema);
-            flag = true;
-        }
+void BaseTablet::update_max_version_schema(const TabletSchemaSPtr& 
tablet_schema) {

Review Comment:
   warning: method 'update_max_version_schema' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/base_tablet.h:65:
   ```diff
   -     void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);
   +     static void update_max_version_schema(const TabletSchemaSPtr& 
tablet_schema);
   ```
   



##########
be/src/olap/data_dir.cpp:
##########
@@ -626,22 +626,23 @@ Status DataDir::load() {
     return Status::OK();
 }
 
-void DataDir::add_pending_ids(const std::string& id) {
-    std::lock_guard<std::shared_mutex> wr_lock(_pending_path_mutex);
-    _pending_path_ids.insert(id);
-}
+void DataDir::perform_path_gc() {
+    std::unique_lock<std::mutex> lck(_check_path_mutex);
+    _check_path_cv.wait(lck, [this] {
+        return _stop_bg_worker || !_all_tablet_schemahash_paths.empty() ||
+               !_all_check_paths.empty();
+    });
+    if (_stop_bg_worker) {
+        return;
+    }
 
-void DataDir::remove_pending_ids(const std::string& id) {
-    std::lock_guard<std::shared_mutex> wr_lock(_pending_path_mutex);
-    _pending_path_ids.erase(id);
+    _perform_path_gc_by_tablet();
+    _perform_path_gc_by_rowsetid();
 }
 
 // gc unused tablet schemahash dir
-void DataDir::perform_path_gc_by_tablet() {
-    std::unique_lock<std::mutex> lck(_check_path_mutex);
-    _check_path_cv.wait(
-            lck, [this] { return _stop_bg_worker || 
!_all_tablet_schemahash_paths.empty(); });
-    if (_stop_bg_worker) {
+void DataDir::_perform_path_gc_by_tablet() {

Review Comment:
   warning: method '_perform_path_gc_by_tablet' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/data_dir.h:164:
   ```diff
   -     void _perform_path_gc_by_tablet();
   +     static void _perform_path_gc_by_tablet();
   ```
   



##########
be/src/io/fs/s3_file_bufferpool.h:
##########
@@ -0,0 +1,356 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <cstdint>
+#include <fstream>
+#include <functional>
+#include <list>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "io/cache/block/block_file_segment.h"
+#include "runtime/exec_env.h"
+#include "util/slice.h"
+#include "util/threadpool.h"
+
+namespace doris {
+namespace io {
+enum class BufferType { DOWNLOAD, UPLOAD };
+using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>;
+struct OperationState {
+    OperationState(std::function<bool(Status)> sync_after_complete_task,
+                   std::function<bool()> is_cancelled)
+            : _sync_after_complete_task(std::move(sync_after_complete_task)),
+              _is_cancelled(std::move(is_cancelled)) {}
+    /**
+    * set the val of this operation state which indicates it failed or 
succeeded
+    *
+    * @param S the execution result
+    */
+    void set_val(Status s = Status::OK()) {
+        // make sure we wouldn't sync twice
+        if (_value_set) [[unlikely]] {
+            return;
+        }
+        if (nullptr != _sync_after_complete_task) {
+            _fail_after_sync = _sync_after_complete_task(s);
+        }
+        _value_set = true;
+    }
+
+    /**
+    * detect whether the execution task is done
+    *
+    * @return is the execution task is done
+    */
+    [[nodiscard]] bool is_cancelled() const {
+        DCHECK(nullptr != _is_cancelled);
+        // If _fail_after_sync is true then it means the sync task already 
returns
+        // that the task failed and if the outside file writer might already be
+        // destructed
+        return _fail_after_sync ? true : _is_cancelled();
+    }
+
+    std::function<bool(Status)> _sync_after_complete_task;
+    std::function<bool()> _is_cancelled;
+    bool _value_set = false;
+    bool _fail_after_sync = false;
+};
+
+struct FileBuffer : public std::enable_shared_from_this<FileBuffer> {
+    FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, size_t 
offset,
+               OperationState state, bool reserve = false);
+    virtual ~FileBuffer() { on_finish(); }
+    /**
+    * submit the correspoding task to async executor
+    */
+    virtual void submit() = 0;
+    /**
+    * append data to the inner memory buffer
+    *
+    * @param S the content to be appended
+    */
+    virtual Status append_data(const Slice& s) = 0;
+    /**
+    * call the reclaim callback when task is done 
+    */
+    void on_finish();
+    /**
+    * swap memory buffer
+    *
+    * @param other which has memory buffer allocated
+    */
+    void swap_buffer(Slice& other);
+    /**
+    * set the val of it's operation state
+    *
+    * @param S the execution result
+    */
+    void set_val(Status s) { _state.set_val(s); }
+    /**
+    * get the start offset of this file buffer
+    *
+    * @return start offset of this file buffer
+    */
+    size_t get_file_offset() const { return _offset; }
+    /**
+    * get the size of the buffered data
+    *
+    * @return the size of the buffered data
+    */
+    size_t get_size() const { return _size; }
+    /**
+    * detect whether the execution task is done
+    *
+    * @return is the execution task is done
+    */
+    bool is_cancelled() const { return _state.is_cancelled(); }
+
+    std::function<FileBlocksHolderPtr()> _alloc_holder;
+    Slice _buffer;
+    size_t _offset;
+    size_t _size;
+    OperationState _state;
+    size_t _capacity;
+};
+
+struct UploadFileBuffer final : public FileBuffer {
+    UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, 
OperationState state,
+                     size_t offset, std::function<FileBlocksHolderPtr()> 
alloc_holder,
+                     size_t index_offset)
+            : FileBuffer(alloc_holder, offset, state),
+              _upload_to_remote(std::move(upload_cb)),
+              _index_offset(index_offset) {}
+    ~UploadFileBuffer() override = default;
+    void submit() override;
+    /**
+    * set the index offset
+    *
+    * @param offset the index offset
+    */
+    void set_index_offset(size_t offset);
+    Status append_data(const Slice& s) override;

Review Comment:
   warning: function 'doris::io::UploadFileBuffer::append_data' has a 
definition with different parameter names 
[readability-inconsistent-declaration-parameter-name]
   ```cpp
       Status append_data(const Slice& s) override;
              ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/io/fs/s3_file_bufferpool.cpp:84:** the definition seen here
   ```cpp
   Status UploadFileBuffer::append_data(const Slice& data) {
                            ^
   ```
   **be/src/io/fs/s3_file_bufferpool.h:149:** differing parameters are named 
here: ('s'), in definition: ('data')
   ```cpp
       Status append_data(const Slice& s) override;
              ^
   ```
   
   </details>
   



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/rowset_meta.h"
+
+#include "common/logging.h"
+#include "google/protobuf/util/message_differencer.h"
+#include "io/fs/local_file_system.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/pb_to_json.h"
+#include "olap/olap_common.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+RowsetMeta::~RowsetMeta() = default;
+
+bool RowsetMeta::init(const std::string& pb_rowset_meta) {
+    bool ret = _deserialize_from_pb(pb_rowset_meta);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+    }
+    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set 
it back to `rowset_meta_pb`,
+    // this won't break const semantics of `rowset_meta_pb`, because 
`rowset_meta_pb` is not changed
+    // before and after call this method.
+    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
+    auto schema = mut_rowset_meta_pb.release_tablet_schema();
+    _rowset_meta_pb = mut_rowset_meta_pb;
+    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
+    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
+    json2pb::Pb2JsonOptions json_options;
+    json_options.pretty_json = true;
+    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, 
json_options);
+    return ret;
+}
+
+const io::FileSystemSPtr& RowsetMeta::fs() {

Review Comment:
   warning: method 'fs' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   const io::FileSystemSPtr& RowsetMeta::fs() const {
   ```
   
   be/src/olap/rowset/rowset_meta.h:48:
   ```diff
   -     const io::FileSystemSPtr& fs();
   +     const io::FileSystemSPtr& fs() const;
   ```
   



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/rowset_meta.h"
+
+#include "common/logging.h"
+#include "google/protobuf/util/message_differencer.h"
+#include "io/fs/local_file_system.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/pb_to_json.h"
+#include "olap/olap_common.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+RowsetMeta::~RowsetMeta() = default;
+
+bool RowsetMeta::init(const std::string& pb_rowset_meta) {
+    bool ret = _deserialize_from_pb(pb_rowset_meta);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+    }
+    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set 
it back to `rowset_meta_pb`,
+    // this won't break const semantics of `rowset_meta_pb`, because 
`rowset_meta_pb` is not changed
+    // before and after call this method.
+    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
+    auto schema = mut_rowset_meta_pb.release_tablet_schema();
+    _rowset_meta_pb = mut_rowset_meta_pb;
+    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
+    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {

Review Comment:
   warning: method 'json_rowset_meta' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
   ```
   



##########
be/src/io/fs/s3_file_bufferpool.h:
##########
@@ -0,0 +1,356 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <cstdint>
+#include <fstream>
+#include <functional>
+#include <list>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "io/cache/block/block_file_segment.h"
+#include "runtime/exec_env.h"
+#include "util/slice.h"
+#include "util/threadpool.h"
+
+namespace doris {
+namespace io {

Review Comment:
   warning: nested namespaces can be concatenated 
[modernize-concat-nested-namespaces]
   
   ```suggestion
   namespace doris::io {
   ```
   
   be/src/io/fs/s3_file_bufferpool.h:354:
   ```diff
   - } // namespace io
   - } // namespace doris
   + } // namespace doris
   ```
   



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/rowset_meta.h"
+
+#include "common/logging.h"
+#include "google/protobuf/util/message_differencer.h"
+#include "io/fs/local_file_system.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/pb_to_json.h"
+#include "olap/olap_common.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+RowsetMeta::~RowsetMeta() = default;
+
+bool RowsetMeta::init(const std::string& pb_rowset_meta) {
+    bool ret = _deserialize_from_pb(pb_rowset_meta);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+    }
+    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set 
it back to `rowset_meta_pb`,
+    // this won't break const semantics of `rowset_meta_pb`, because 
`rowset_meta_pb` is not changed
+    // before and after call this method.
+    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
+    auto schema = mut_rowset_meta_pb.release_tablet_schema();
+    _rowset_meta_pb = mut_rowset_meta_pb;
+    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
+    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
+    json2pb::Pb2JsonOptions json_options;
+    json_options.pretty_json = true;
+    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, 
json_options);
+    return ret;
+}
+
+const io::FileSystemSPtr& RowsetMeta::fs() {
+    if (!_fs) {
+        if (is_local()) {
+            _fs = io::global_local_filesystem();
+        } else {
+            _fs = get_filesystem(resource_id());
+            LOG_IF(WARNING, !_fs) << "Cannot get file system: " << 
resource_id();
+        }
+    }
+    return _fs;
+}
+
+void RowsetMeta::set_fs(io::FileSystemSPtr fs) {

Review Comment:
   warning: method 'set_fs' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/rowset/rowset_meta.h:50:
   ```diff
   -     void set_fs(io::FileSystemSPtr fs);
   +     static void set_fs(io::FileSystemSPtr fs);
   ```
   



##########
be/src/io/fs/s3_file_bufferpool.cpp:
##########
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "s3_file_bufferpool.h"
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "io/cache/block/block_file_segment.h"
+#include "io/fs/s3_common.h"
+#include "runtime/exec_env.h"
+#include "util/defer_op.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace io {
+
+/**
+ * 0. check if the inner memory buffer is empty or not
+ * 1. relcaim the memory buffer if it's mot empty
+ */
+void FileBuffer::on_finish() {
+    if (_buffer.empty()) {
+        return;
+    }
+    S3FileBufferPool::GetInstance()->reclaim(Slice {_buffer.get_data(), 
_capacity});
+    _buffer.clear();
+}
+
+/**
+ * take other buffer's memory space and refresh capacity
+ */
+void FileBuffer::swap_buffer(Slice& other) {
+    _buffer = other;
+    _capacity = _buffer.get_size();
+    other.clear();
+}
+
+FileBuffer::FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, 
size_t offset,
+                       OperationState state, bool reserve)
+        : _alloc_holder(std::move(alloc_holder)),
+          _buffer(S3FileBufferPool::GetInstance()->allocate(reserve)),
+          _offset(offset),
+          _size(0),
+          _state(std::move(state)),
+          _capacity(_buffer.get_size()) {}
+
+/**
+ * 0. check if file cache holder allocated
+ * 1. update the cache's type to index cache
+ */
+void UploadFileBuffer::set_index_offset(size_t offset) {
+    _index_offset = offset;
+    if (_holder) {
+        bool change_to_index_cache = false;
+        for (auto iter = _holder->file_segments.begin(); iter != 
_holder->file_segments.end();
+             ++iter) {
+            if (iter == _cur_file_segment) {
+                change_to_index_cache = true;
+            }
+            if (change_to_index_cache) {
+                
static_cast<void>((*iter)->change_cache_type_self(CacheType::INDEX));
+            }
+        }
+    }
+}
+
+/**
+ * 0. when there is memory preserved, directly write data to buf
+ * 1. write to file cache otherwise, then we'll wait for free buffer and to 
rob it
+ */
+Status UploadFileBuffer::append_data(const Slice& data) {
+    Defer defer {[&] { _size += data.get_size(); }};
+    while (true) {
+        // if buf is not empty, it means there is memory preserved for this buf
+        if (!_buffer.empty()) {
+            std::memcpy((void*)(_buffer.get_data() + _size), data.get_data(), 
data.get_size());
+            break;
+        }
+        // if the buf has no memory reserved, then write to disk first
+        if (!_is_cache_allocated && config::enable_file_cache && _alloc_holder 
!= nullptr) {
+            _holder = _alloc_holder();
+            bool cache_is_not_enough = false;
+            for (auto& segment : _holder->file_segments) {
+                DCHECK(segment->state() == FileBlock::State::SKIP_CACHE ||
+                       segment->state() == FileBlock::State::EMPTY);
+                if (segment->state() == FileBlock::State::SKIP_CACHE) 
[[unlikely]] {
+                    cache_is_not_enough = true;
+                    break;
+                }
+                if (_index_offset != 0) {
+                    
static_cast<void>(segment->change_cache_type_self(CacheType::INDEX));
+                }
+            }
+            // if cache_is_not_enough, cannot use it !
+            _cur_file_segment = _holder->file_segments.begin();
+            _append_offset = (*_cur_file_segment)->range().left;
+            _holder = cache_is_not_enough ? nullptr : std::move(_holder);
+            if (_holder) {
+                (*_cur_file_segment)->get_or_set_downloader();
+            }
+            _is_cache_allocated = true;
+        }
+        if (_holder) [[likely]] {
+            size_t data_remain_size = data.get_size();
+            size_t pos = 0;
+            while (data_remain_size != 0) {
+                auto range = (*_cur_file_segment)->range();
+                size_t segment_remain_size = range.right - _append_offset + 1;
+                size_t append_size = std::min(data_remain_size, 
segment_remain_size);
+                Slice append_data(data.get_data() + pos, append_size);
+                // When there is no available free memory buffer, the data 
will be written to the cache first
+                // and then uploaded to S3 when there is an available free 
memory buffer.
+                // However, if an error occurs during the write process to the 
local cache,
+                // continuing to upload the dirty data from the cache to S3 
will result in erroneous data(Bad segment).
+                // Considering that local disk write failures are rare, a 
simple approach is chosen here,
+                // which is to treat the import as a failure directly when a 
local write failure occurs
+                RETURN_IF_ERROR((*_cur_file_segment)->append(append_data));
+                if (segment_remain_size == append_size) {
+                    RETURN_IF_ERROR((*_cur_file_segment)->finalize_write());
+                    if (++_cur_file_segment != _holder->file_segments.end()) {
+                        (*_cur_file_segment)->get_or_set_downloader();
+                    }
+                }
+                data_remain_size -= append_size;
+                _append_offset += append_size;
+                pos += append_size;
+            }
+            break;
+        } else {
+            // wait allocate buffer pool
+            auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+            swap_buffer(tmp);
+        }
+    }
+    return Status::OK();
+}
+
+/**
+ * 0. allocate one memory buffer
+ * 1. read the content from the cache and then write
+ * it into memory buffer
+ */
+void UploadFileBuffer::read_from_cache() {
+    auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+    swap_buffer(tmp);
+
+    DCHECK(_holder != nullptr);
+    DCHECK(_capacity >= _size);
+    size_t pos = 0;
+    for (auto& segment : _holder->file_segments) {
+        if (pos == _size) {
+            break;
+        }
+        if (auto s = segment->finalize_write(); !s.ok()) [[unlikely]] {
+            set_val(std::move(s));
+            return;
+        }
+        size_t segment_size = segment->range().size();
+        Slice s(_buffer.get_data() + pos, segment_size);
+        if (auto st = segment->read_at(s, 0); !st.ok()) [[unlikely]] {
+            set_val(std::move(st));
+            return;
+        }
+        pos += segment_size;
+    }
+
+    // the real lenght should be the buf.get_size() in this situation(consider 
it's the last part,
+    // size of it could be less than 5MB)
+    _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+}
+
+/**
+ * 0. constrcut the stream ptr if the buffer is not empty
+ * 1. submit the on_upload() callback to executor
+ */
+void UploadFileBuffer::submit() {
+    if (!_buffer.empty()) [[likely]] {
+        _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+    }
+    // If the data is written into file cache
+    if (_holder && _cur_file_segment != _holder->file_segments.end()) {
+        if (auto s = (*_cur_file_segment)->finalize_write(); !s.ok()) 
[[unlikely]] {
+            set_val(std::move(s));
+            return;
+        }
+    }
+    
static_cast<void>(S3FileBufferPool::GetInstance()->thread_pool()->submit_func(
+            [buf = this->shared_from_this(), this]() {
+                // to extend buf's lifetime
+                // (void)buf;
+                on_upload();
+            }));
+}
+
+/**
+ * write the content of the memory buffer to local file cache
+ */
+void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) {
+    if (!config::enable_file_cache || _alloc_holder == nullptr) {
+        return;
+    }
+    if (_holder) {
+        return;
+    }
+    if (is_cancelled) {
+        return;
+    }
+    // the data is already written to S3 in this situation
+    // so i didn't handle the file cache write error
+    _holder = _alloc_holder();
+    size_t pos = 0;
+    size_t data_remain_size = _size;
+    for (auto& segment : _holder->file_segments) {
+        if (data_remain_size == 0) {
+            break;
+        }
+        size_t segment_size = segment->range().size();
+        size_t append_size = std::min(data_remain_size, segment_size);
+        if (segment->state() == FileBlock::State::EMPTY) {
+            if (_index_offset != 0 && segment->range().right >= _index_offset) 
{
+                // segment->change_cache_type_self(CacheType::INDEX);
+            }
+            segment->get_or_set_downloader();
+            // Another thread may have started downloading due to a query
+            // Just skip putting to cache from UploadFileBuffer
+            if (segment->is_downloader()) {
+                Slice s(_buffer.get_data() + pos, append_size);
+                if (auto st = segment->append(s); !st.ok()) [[unlikely]] {
+                    LOG_WARNING("append data to cache segmetn failed due to 
{}", st);
+                    return;
+                }
+                if (auto st = segment->finalize_write(); !st.ok()) 
[[unlikely]] {
+                    LOG_WARNING("finalize write to cache segmetn failed due to 
{}", st);
+                    return;
+                }
+            }
+        }
+        data_remain_size -= append_size;
+        pos += append_size;
+    }
+}
+
+FileBufferBuilder& FileBufferBuilder::set_type(BufferType type) {
+    _type = type;
+    return *this;
+}
+FileBufferBuilder& FileBufferBuilder::set_upload_callback(
+        std::function<void(UploadFileBuffer& buf)> cb) {
+    _upload_cb = std::move(cb);
+    return *this;
+}
+// set callback to do task sync for the caller
+FileBufferBuilder& 
FileBufferBuilder::set_sync_after_complete_task(std::function<bool(Status)> cb) 
{
+    _sync_after_complete_task = std::move(cb);
+    return *this;
+}
+
+FileBufferBuilder& FileBufferBuilder::set_allocate_file_segments_holder(
+        std::function<FileBlocksHolderPtr()> cb) {
+    _alloc_holder_cb = std::move(cb);
+    return *this;
+}
+
+std::shared_ptr<FileBuffer> FileBufferBuilder::build() {

Review Comment:
   warning: method 'build' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   std::shared_ptr<FileBuffer> FileBufferBuilder::build() const {
   ```
   
   be/src/io/fs/s3_file_bufferpool.h:218:
   ```diff
   -     std::shared_ptr<FileBuffer> build();
   +     std::shared_ptr<FileBuffer> build() const;
   ```
   



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/rowset_meta.h"
+
+#include "common/logging.h"
+#include "google/protobuf/util/message_differencer.h"
+#include "io/fs/local_file_system.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/pb_to_json.h"
+#include "olap/olap_common.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+RowsetMeta::~RowsetMeta() = default;
+
+bool RowsetMeta::init(const std::string& pb_rowset_meta) {
+    bool ret = _deserialize_from_pb(pb_rowset_meta);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+    }
+    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set 
it back to `rowset_meta_pb`,
+    // this won't break const semantics of `rowset_meta_pb`, because 
`rowset_meta_pb` is not changed
+    // before and after call this method.
+    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
+    auto schema = mut_rowset_meta_pb.release_tablet_schema();
+    _rowset_meta_pb = mut_rowset_meta_pb;
+    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
+    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
+    json2pb::Pb2JsonOptions json_options;
+    json_options.pretty_json = true;
+    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, 
json_options);
+    return ret;
+}
+
+const io::FileSystemSPtr& RowsetMeta::fs() {
+    if (!_fs) {
+        if (is_local()) {
+            _fs = io::global_local_filesystem();
+        } else {
+            _fs = get_filesystem(resource_id());
+            LOG_IF(WARNING, !_fs) << "Cannot get file system: " << 
resource_id();
+        }
+    }
+    return _fs;
+}
+
+void RowsetMeta::set_fs(io::FileSystemSPtr fs) {
+    if (fs && fs->type() != io::FileSystemType::LOCAL) {
+        _rowset_meta_pb.set_resource_id(fs->id());
+    }
+    _fs = std::move(fs);
+}
+
+void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) const {
+    *rs_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
+    }
+}
+
+RowsetMetaPB RowsetMeta::get_rowset_pb() {
+    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
+    }
+    return rowset_meta_pb;
+}
+
+void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
+    _schema = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
+}
+
+bool RowsetMeta::_deserialize_from_pb(const std::string& value) {
+    RowsetMetaPB rowset_meta_pb;
+    if (!rowset_meta_pb.ParseFromString(value)) {
+        return false;
+    }
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+        rowset_meta_pb.clear_tablet_schema();
+    }
+    _rowset_meta_pb = rowset_meta_pb;
+    return true;
+}
+
+bool RowsetMeta::_serialize_to_pb(std::string* value) {
+    if (value == nullptr) {
+        return false;
+    }
+    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
+    }
+    return rowset_meta_pb.SerializeToString(value);
+}
+
+void RowsetMeta::_init() {
+    if (_rowset_meta_pb.rowset_id() > 0) {
+        _rowset_id.init(_rowset_meta_pb.rowset_id());
+    } else {
+        _rowset_id.init(_rowset_meta_pb.rowset_id_v2());
+    }
+}
+
+bool operator==(const RowsetMeta& a, const RowsetMeta& b) {
+    if (a._rowset_id != b._rowset_id) return false;
+    if (a._is_removed_from_rowset_meta != b._is_removed_from_rowset_meta) 
return false;

Review Comment:
   warning: statement should be inside braces 
[readability-braces-around-statements]
   
   ```suggestion
       if (a._is_removed_from_rowset_meta != b._is_removed_from_rowset_meta) { 
return false;
   }
   ```
   



##########
be/src/olap/base_tablet.h:
##########
@@ -17,131 +17,86 @@
 
 #pragma once
 
-#include <butil/macros.h>
-#include <gen_cpp/olap_file.pb.h>
-#include <stdint.h>
-
 #include <memory>
+#include <shared_mutex>
 #include <string>
 
 #include "common/status.h"
-#include "olap/olap_common.h"
+#include "olap/tablet_fwd.h"
 #include "olap/tablet_meta.h"
-#include "olap/tablet_schema.h"
 #include "util/metrics.h"
 
 namespace doris {
+struct RowSetSplits;
+struct RowsetWriterContext;
+class RowsetWriter;
 
-class DataDir;
-
-// Base class for all tablet classes, currently only olap/Tablet
-// The fields and methods in this class is not final, it will change as memory
-// storage engine evolves.
+// Base class for all tablet classes
 class BaseTablet {
 public:
-    BaseTablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir);
+    explicit BaseTablet(TabletMetaSharedPtr tablet_meta);
     virtual ~BaseTablet();
+    BaseTablet(const BaseTablet&) = delete;
+    BaseTablet& operator=(const BaseTablet&) = delete;
 
-    DataDir* data_dir() const;
-    const std::string& tablet_path() const;
-
-    TabletState tablet_state() const { return _state; }
+    const std::string& tablet_path() const { return _tablet_path; }
+    TabletState tablet_state() const { return _tablet_meta->tablet_state(); }
     Status set_tablet_state(TabletState state);
+    int64_t table_id() const { return _tablet_meta->table_id(); }
+    int64_t partition_id() const { return _tablet_meta->partition_id(); }
+    int64_t tablet_id() const { return _tablet_meta->tablet_id(); }
+    int32_t schema_hash() const { return _tablet_meta->schema_hash(); }
+    KeysType keys_type() const { return 
_tablet_meta->tablet_schema()->keys_type(); }
+    size_t num_key_columns() const { return 
_tablet_meta->tablet_schema()->num_key_columns(); }
+    bool enable_unique_key_merge_on_write() const {
+#ifdef BE_TEST
+        if (_tablet_meta == nullptr) {
+            return false;
+        }
+#endif
+        return _tablet_meta->enable_unique_key_merge_on_write();
+    }
 
     // Property encapsulated in TabletMeta
-    const TabletMetaSharedPtr& tablet_meta();
+    const TabletMetaSharedPtr& tablet_meta() { return _tablet_meta; }
 
-    TabletUid tablet_uid() const;
-    int64_t table_id() const;
-    // Returns a string can be used to uniquely identify a tablet.
-    // The result string will often be printed to the log.
-    const std::string full_name() const;
-    int64_t partition_id() const;
-    int64_t tablet_id() const;
-    int64_t replica_id() const;
-    int32_t schema_hash() const;
-    int16_t shard_id() const;
+    // FIXME(plat1ko): It is not appropriate to expose this lock
+    std::shared_mutex& get_header_lock() { return _meta_lock; }
 
-    int64_t storage_policy_id() const { return 
_tablet_meta->storage_policy_id(); }
+    void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);
 
-    void set_storage_policy_id(int64_t id) { 
_tablet_meta->set_storage_policy_id(id); }
+    TabletSchemaSPtr tablet_schema() const {

Review Comment:
   warning: method 'tablet_schema' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static TabletSchemaSPtr tablet_schema() {
   ```
   



##########
be/src/olap/rowset/rowset_meta.h:
##########
@@ -328,20 +260,20 @@ class RowsetMeta {
 
     auto& get_segments_key_bounds() { return 
_rowset_meta_pb.segments_key_bounds(); }
 
-    virtual bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
+    bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
         // for compatibility, old version has not segment key bounds
         if (_rowset_meta_pb.segments_key_bounds_size() == 0) {
             return false;
         }
-        *key_bounds = _rowset_meta_pb.segments_key_bounds(0);
+        *key_bounds = *_rowset_meta_pb.segments_key_bounds().begin();
         return true;
     }
-    virtual bool get_last_segment_key_bound(KeyBoundsPB* key_bounds) {
+
+    bool get_last_segment_key_bound(KeyBoundsPB* key_bounds) {

Review Comment:
   warning: method 'get_last_segment_key_bound' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static bool get_last_segment_key_bound(KeyBoundsPB* key_bounds) {
   ```
   



##########
be/src/olap/rowset/rowset_meta.cpp:
##########
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/rowset/rowset_meta.h"
+
+#include "common/logging.h"
+#include "google/protobuf/util/message_differencer.h"
+#include "io/fs/local_file_system.h"
+#include "json2pb/json_to_pb.h"
+#include "json2pb/pb_to_json.h"
+#include "olap/olap_common.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+
+namespace doris {
+
+RowsetMeta::~RowsetMeta() = default;
+
+bool RowsetMeta::init(const std::string& pb_rowset_meta) {
+    bool ret = _deserialize_from_pb(pb_rowset_meta);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_pb(const RowsetMetaPB& rowset_meta_pb) {
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+    }
+    // Release ownership of TabletSchemaPB from `rowset_meta_pb` and then set 
it back to `rowset_meta_pb`,
+    // this won't break const semantics of `rowset_meta_pb`, because 
`rowset_meta_pb` is not changed
+    // before and after call this method.
+    auto& mut_rowset_meta_pb = const_cast<RowsetMetaPB&>(rowset_meta_pb);
+    auto schema = mut_rowset_meta_pb.release_tablet_schema();
+    _rowset_meta_pb = mut_rowset_meta_pb;
+    mut_rowset_meta_pb.set_allocated_tablet_schema(schema);
+    _init();
+    return true;
+}
+
+bool RowsetMeta::init_from_json(const std::string& json_rowset_meta) {
+    bool ret = json2pb::JsonToProtoMessage(json_rowset_meta, &_rowset_meta_pb);
+    if (!ret) {
+        return false;
+    }
+    _init();
+    return true;
+}
+
+bool RowsetMeta::json_rowset_meta(std::string* json_rowset_meta) {
+    json2pb::Pb2JsonOptions json_options;
+    json_options.pretty_json = true;
+    bool ret = json2pb::ProtoMessageToJson(_rowset_meta_pb, json_rowset_meta, 
json_options);
+    return ret;
+}
+
+const io::FileSystemSPtr& RowsetMeta::fs() {
+    if (!_fs) {
+        if (is_local()) {
+            _fs = io::global_local_filesystem();
+        } else {
+            _fs = get_filesystem(resource_id());
+            LOG_IF(WARNING, !_fs) << "Cannot get file system: " << 
resource_id();
+        }
+    }
+    return _fs;
+}
+
+void RowsetMeta::set_fs(io::FileSystemSPtr fs) {
+    if (fs && fs->type() != io::FileSystemType::LOCAL) {
+        _rowset_meta_pb.set_resource_id(fs->id());
+    }
+    _fs = std::move(fs);
+}
+
+void RowsetMeta::to_rowset_pb(RowsetMetaPB* rs_meta_pb) const {
+    *rs_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rs_meta_pb->mutable_tablet_schema());
+    }
+}
+
+RowsetMetaPB RowsetMeta::get_rowset_pb() {
+    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
+    }
+    return rowset_meta_pb;
+}
+
+void RowsetMeta::set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
+    _schema = TabletSchemaCache::instance()->insert(tablet_schema->to_key());
+}
+
+bool RowsetMeta::_deserialize_from_pb(const std::string& value) {
+    RowsetMetaPB rowset_meta_pb;
+    if (!rowset_meta_pb.ParseFromString(value)) {
+        return false;
+    }
+    if (rowset_meta_pb.has_tablet_schema()) {
+        _schema = TabletSchemaCache::instance()->insert(
+                rowset_meta_pb.tablet_schema().SerializeAsString());
+        rowset_meta_pb.clear_tablet_schema();
+    }
+    _rowset_meta_pb = rowset_meta_pb;
+    return true;
+}
+
+bool RowsetMeta::_serialize_to_pb(std::string* value) {
+    if (value == nullptr) {
+        return false;
+    }
+    RowsetMetaPB rowset_meta_pb = _rowset_meta_pb;
+    if (_schema) {
+        _schema->to_schema_pb(rowset_meta_pb.mutable_tablet_schema());
+    }
+    return rowset_meta_pb.SerializeToString(value);
+}
+
+void RowsetMeta::_init() {

Review Comment:
   warning: method '_init' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/rowset/rowset_meta.h:306:
   ```diff
   -     void _init();
   +     static void _init();
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to