This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 605aa8d9240 [fix](move-memtable) fix timeout to get tablet schema 
(#33256)
605aa8d9240 is described below

commit 605aa8d92409d8830e38754e5b7787a63f1a69a8
Author: Xin Liao <[email protected]>
AuthorDate: Thu Apr 4 21:25:14 2024 +0800

    [fix](move-memtable) fix timeout to get tablet schema (#33256)
---
 be/src/runtime/exec_env.cpp                        |  2 +-
 be/src/runtime/exec_env.h                          |  6 +--
 be/src/runtime/exec_env_init.cpp                   |  6 +--
 ...ream_stub_pool.cpp => load_stream_map_pool.cpp} | 24 +++++-----
 ...d_stream_stub_pool.h => load_stream_map_pool.h} | 15 +++---
 be/src/vec/sink/load_stream_stub.cpp               |  2 +-
 be/src/vec/sink/volap_table_sink_v2.cpp            |  2 -
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       | 55 +++++++++++-----------
 be/src/vec/sink/writer/vtablet_writer_v2.h         |  2 +-
 ...pool_test.cpp => load_stream_stub_map_test.cpp} | 13 +++--
 10 files changed, 63 insertions(+), 64 deletions(-)

diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 8806ff2900f..306ad94099f 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -34,7 +34,7 @@
 #include "util/debug_util.h"
 #include "util/time.h"
 #include "vec/sink/delta_writer_v2_pool.h"
-#include "vec/sink/load_stream_stub_pool.h"
+#include "vec/sink/load_stream_map_pool.h"
 
 namespace doris {
 
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 1ba35d6b200..e24727d845c 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -82,7 +82,7 @@ class RuntimeQueryStatiticsMgr;
 class TMasterInfo;
 class LoadChannelMgr;
 class LoadStreamMgr;
-class LoadStreamStubPool;
+class LoadStreamMapPool;
 class StreamLoadExecutor;
 class RoutineLoadTaskExecutor;
 class SmallFileMgr;
@@ -264,7 +264,7 @@ public:
     }
 
 #endif
-    LoadStreamStubPool* load_stream_stub_pool() { return 
_load_stream_stub_pool.get(); }
+    LoadStreamMapPool* load_stream_map_pool() { return 
_load_stream_map_pool.get(); }
 
     vectorized::DeltaWriterV2Pool* delta_writer_v2_pool() { return 
_delta_writer_v2_pool.get(); }
 
@@ -393,7 +393,7 @@ private:
     // To save meta info of external file, such as parquet footer.
     FileMetaCache* _file_meta_cache = nullptr;
     std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
-    std::unique_ptr<LoadStreamStubPool> _load_stream_stub_pool;
+    std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool;
     std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
     std::shared_ptr<WalManager> _wal_manager;
     DNSCache* _dns_cache = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index c44605dc845..a8b2131d883 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -104,7 +104,7 @@
 #include "vec/exec/scan/scanner_scheduler.h"
 #include "vec/runtime/vdata_stream_mgr.h"
 #include "vec/sink/delta_writer_v2_pool.h"
-#include "vec/sink/load_stream_stub_pool.h"
+#include "vec/sink/load_stream_map_pool.h"
 #include "vec/spill/spill_stream_manager.h"
 
 #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && 
!defined(LEAK_SANITIZER) && \
@@ -245,7 +245,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     _block_spill_mgr = new BlockSpillManager(store_paths);
     _group_commit_mgr = new GroupCommitMgr(this);
     _memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
-    _load_stream_stub_pool = std::make_unique<LoadStreamStubPool>();
+    _load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
     _delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
     _file_cache_open_fd_cache = std::make_unique<io::FDCache>();
     _wal_manager = WalManager::create_shared(this, 
config::group_commit_wal_path);
@@ -595,7 +595,7 @@ void ExecEnv::destroy() {
     _stream_load_executor.reset();
     _memtable_memory_limiter.reset();
     _delta_writer_v2_pool.reset();
-    _load_stream_stub_pool.reset();
+    _load_stream_map_pool.reset();
     _file_cache_open_fd_cache.reset();
 
     // StorageEngine must be destoried before _page_no_cache_mem_tracker.reset 
and _cache_manager destory
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp 
b/be/src/vec/sink/load_stream_map_pool.cpp
similarity index 85%
rename from be/src/vec/sink/load_stream_stub_pool.cpp
rename to be/src/vec/sink/load_stream_map_pool.cpp
index 3eae49aff77..f335f05e162 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -15,21 +15,22 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "vec/sink/load_stream_stub_pool.h"
+#include "vec/sink/load_stream_map_pool.h"
 
 #include "util/debug_points.h"
-#include "vec/sink/load_stream_stub.h"
 
 namespace doris {
 class TExpr;
 
 LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int 
num_streams, int num_use,
-                             LoadStreamStubPool* pool)
+                             LoadStreamMapPool* pool)
         : _load_id(load_id),
           _src_id(src_id),
           _num_streams(num_streams),
           _use_cnt(num_use),
-          _pool(pool) {
+          _pool(pool),
+          _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
+          _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
     DCHECK(num_streams > 0) << "stream num should be greater than 0";
     DCHECK(num_use > 0) << "use num should be greater than 0";
 }
@@ -41,10 +42,9 @@ std::shared_ptr<Streams> 
LoadStreamMap::get_or_create(int64_t dst_id) {
         return streams;
     }
     streams = std::make_shared<Streams>();
-    auto schema_map = std::make_shared<IndexToTabletSchema>();
-    auto mow_map = std::make_shared<IndexToEnableMoW>();
     for (int i = 0; i < _num_streams; i++) {
-        streams->emplace_back(new LoadStreamStub(_load_id, _src_id, 
schema_map, mow_map));
+        streams->emplace_back(new LoadStreamStub(_load_id, _src_id, 
_tablet_schema_for_index,
+                                                 
_enable_unique_mow_for_index));
     }
     _streams_for_node[dst_id] = streams;
     return streams;
@@ -103,11 +103,11 @@ Status LoadStreamMap::close_load() {
     });
 }
 
-LoadStreamStubPool::LoadStreamStubPool() = default;
+LoadStreamMapPool::LoadStreamMapPool() = default;
 
-LoadStreamStubPool::~LoadStreamStubPool() = default;
-std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId 
load_id, int64_t src_id,
-                                                                 int 
num_streams, int num_use) {
+LoadStreamMapPool::~LoadStreamMapPool() = default;
+std::shared_ptr<LoadStreamMap> LoadStreamMapPool::get_or_create(UniqueId 
load_id, int64_t src_id,
+                                                                int 
num_streams, int num_use) {
     std::lock_guard<std::mutex> lock(_mutex);
     std::shared_ptr<LoadStreamMap> streams = _pool[load_id];
     if (streams != nullptr) {
@@ -118,7 +118,7 @@ std::shared_ptr<LoadStreamMap> 
LoadStreamStubPool::get_or_create(UniqueId load_i
     return streams;
 }
 
-void LoadStreamStubPool::erase(UniqueId load_id) {
+void LoadStreamMapPool::erase(UniqueId load_id) {
     std::lock_guard<std::mutex> lock(_mutex);
     _pool.erase(load_id);
 }
diff --git a/be/src/vec/sink/load_stream_stub_pool.h 
b/be/src/vec/sink/load_stream_map_pool.h
similarity index 91%
rename from be/src/vec/sink/load_stream_stub_pool.h
rename to be/src/vec/sink/load_stream_map_pool.h
index 65f3bb66cd2..aad12dba2aa 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -63,19 +63,20 @@
 #include "vec/core/block.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr_fwd.h"
+#include "vec/sink/load_stream_stub.h"
 
 namespace doris {
 
 class LoadStreamStub;
 
-class LoadStreamStubPool;
+class LoadStreamMapPool;
 
 using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
 
 class LoadStreamMap {
 public:
     LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int 
num_use,
-                  LoadStreamStubPool* pool);
+                  LoadStreamMapPool* pool);
 
     std::shared_ptr<Streams> get_or_create(int64_t dst_id);
 
@@ -103,17 +104,19 @@ private:
     std::atomic<int> _use_cnt;
     std::mutex _mutex;
     std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
-    LoadStreamStubPool* _pool = nullptr;
+    LoadStreamMapPool* _pool = nullptr;
+    std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
+    std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
 
     std::mutex _tablets_to_commit_mutex;
     std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
 };
 
-class LoadStreamStubPool {
+class LoadStreamMapPool {
 public:
-    LoadStreamStubPool();
+    LoadStreamMapPool();
 
-    ~LoadStreamStubPool();
+    ~LoadStreamMapPool();
 
     std::shared_ptr<LoadStreamMap> get_or_create(UniqueId load_id, int64_t 
src_id, int num_streams,
                                                  int num_use);
diff --git a/be/src/vec/sink/load_stream_stub.cpp 
b/be/src/vec/sink/load_stream_stub.cpp
index 2e118dce5c1..b4a9ac18643 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -250,7 +250,7 @@ Status LoadStreamStub::get_schema(const 
std::vector<PTabletID>& tablets) {
     PStreamHeader header;
     *header.mutable_load_id() = _load_id;
     header.set_src_id(_src_id);
-    header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
+    header.set_opcode(doris::PStreamHeader::GET_SCHEMA);
     std::ostringstream oss;
     oss << "fetching tablet schema from stream " << _stream_id
         << ", load id: " << print_id(_load_id) << ", tablet id:";
diff --git a/be/src/vec/sink/volap_table_sink_v2.cpp 
b/be/src/vec/sink/volap_table_sink_v2.cpp
index a41b77ace53..a73ee483bd3 100644
--- a/be/src/vec/sink/volap_table_sink_v2.cpp
+++ b/be/src/vec/sink/volap_table_sink_v2.cpp
@@ -31,8 +31,6 @@
 #include "runtime/runtime_state.h"
 #include "util/doris_metrics.h"
 #include "vec/sink/delta_writer_v2_pool.h"
-#include "vec/sink/load_stream_stub.h"
-#include "vec/sink/load_stream_stub_pool.h"
 
 namespace doris {
 class TExpr;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 42594f13d99..bbe00b589ee 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -48,8 +48,8 @@
 #include "vec/core/block.h"
 #include "vec/sink/delta_writer_v2_pool.h"
 // NOLINTNEXTLINE(unused-includes)
+#include "vec/sink/load_stream_map_pool.h"
 #include "vec/sink/load_stream_stub.h" // IWYU pragma: keep
-#include "vec/sink/load_stream_stub_pool.h"
 #include "vec/sink/vtablet_block_convertor.h"
 #include "vec/sink/vtablet_finder.h"
 
@@ -96,7 +96,7 @@ Status VTabletWriterV2::_incremental_open_streams(
                     tablet.set_partition_id(partition->id);
                     tablet.set_index_id(index.index_id);
                     tablet.set_tablet_id(tablet_id);
-                    if (!_streams_for_node->contains(node)) {
+                    if (!_load_stream_map->contains(node)) {
                         new_backends.insert(node);
                     }
                     _tablets_for_node[node].emplace(tablet_id, tablet);
@@ -112,7 +112,7 @@ Status VTabletWriterV2::_incremental_open_streams(
         }
     }
     for (int64_t dst_id : new_backends) {
-        auto streams = _streams_for_node->get_or_create(dst_id);
+        auto streams = _load_stream_map->get_or_create(dst_id);
         RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
     }
     return Status::OK();
@@ -242,7 +242,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, 
RuntimeProfile* profile) {
     } else {
         _delta_writer_for_tablet = 
std::make_shared<DeltaWriterV2Map>(_load_id);
     }
-    _streams_for_node = 
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+    _load_stream_map = 
ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create(
             _load_id, _backend_id, _stream_per_node, _num_local_sink);
     return Status::OK();
 }
@@ -263,7 +263,7 @@ Status VTabletWriterV2::open(RuntimeState* state, 
RuntimeProfile* profile) {
 
 Status VTabletWriterV2::_open_streams() {
     for (auto& [dst_id, _] : _tablets_for_node) {
-        auto streams = _streams_for_node->get_or_create(dst_id);
+        auto streams = _load_stream_map->get_or_create(dst_id);
         RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
     }
     return Status::OK();
@@ -361,7 +361,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, 
int64_t partition_id,
         tablet.set_tablet_id(tablet_id);
         VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id, 
index_id, tablet_id);
         _tablets_for_node[node_id].emplace(tablet_id, tablet);
-        
streams.emplace_back(_streams_for_node->at(node_id)->at(_stream_index));
+        streams.emplace_back(_load_stream_map->at(node_id)->at(_stream_index));
         RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, 
tablet_id));
     }
     _stream_index = (_stream_index + 1) % _stream_per_node;
@@ -470,13 +470,13 @@ Status VTabletWriterV2::_cancel(Status status) {
         _delta_writer_for_tablet->cancel(status);
         _delta_writer_for_tablet.reset();
     }
-    if (_streams_for_node) {
-        _streams_for_node->for_each([status](int64_t dst_id, const Streams& 
streams) {
+    if (_load_stream_map) {
+        _load_stream_map->for_each([status](int64_t dst_id, const Streams& 
streams) {
             for (auto& stream : streams) {
                 stream->cancel(status);
             }
         });
-        _streams_for_node->release();
+        _load_stream_map->release();
     }
     return Status::OK();
 }
@@ -542,29 +542,28 @@ Status VTabletWriterV2::close(Status exec_status) {
         }
 
         _calc_tablets_to_commit();
-        const bool is_last_sink = _streams_for_node->release();
+        const bool is_last_sink = _load_stream_map->release();
         LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << 
is_last_sink
                   << ", load_id=" << print_id(_load_id);
 
         // send CLOSE_LOAD and close_wait on all streams
         if (is_last_sink) {
-            RETURN_IF_ERROR(_streams_for_node->close_load());
+            RETURN_IF_ERROR(_load_stream_map->close_load());
             SCOPED_TIMER(_close_load_timer);
-            RETURN_IF_ERROR(_streams_for_node->for_each_st(
-                    [this](int64_t dst_id, const Streams& streams) -> Status {
-                        for (auto& stream : streams) {
-                            int64_t remain_ms =
-                                    
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
-                                    _timeout_watch.elapsed_time() / 1000 / 
1000;
-                            if (remain_ms <= 0) {
-                                LOG(WARNING) << "load timed out before close 
waiting, load_id="
-                                             << print_id(_load_id);
-                                return Status::TimedOut("load timed out before 
close waiting");
-                            }
-                            RETURN_IF_ERROR(stream->close_wait(_state, 
remain_ms));
-                        }
-                        return Status::OK();
-                    }));
+            RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t 
dst_id,
+                                                                 const 
Streams& streams) -> Status {
+                for (auto& stream : streams) {
+                    int64_t remain_ms = 
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+                                        _timeout_watch.elapsed_time() / 1000 / 
1000;
+                    if (remain_ms <= 0) {
+                        LOG(WARNING) << "load timed out before close waiting, 
load_id="
+                                     << print_id(_load_id);
+                        return Status::TimedOut("load timed out before close 
waiting");
+                    }
+                    RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
+                }
+                return Status::OK();
+            }));
         }
 
         // calculate and submit commit info
@@ -573,7 +572,7 @@ Status VTabletWriterV2::close(Status exec_status) {
             std::unordered_map<int64_t, Status> failed_reason;
             std::vector<TTabletCommitInfo> tablet_commit_infos;
 
-            _streams_for_node->for_each([&](int64_t dst_id, const Streams& 
streams) {
+            _load_stream_map->for_each([&](int64_t dst_id, const Streams& 
streams) {
                 std::unordered_set<int64_t> known_tablets;
                 for (const auto& stream : streams) {
                     for (auto [tablet_id, reason] : stream->failed_tablets()) {
@@ -648,7 +647,7 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
             }
             LOG(WARNING) << msg;
         }
-        _streams_for_node->save_tablets_to_commit(dst_id, tablets_to_commit);
+        _load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit);
     }
 }
 
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h 
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 7785733bf4a..c04cff15cf4 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -217,7 +217,7 @@ private:
     std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> 
_tablets_for_node;
     std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
 
-    std::shared_ptr<LoadStreamMap> _streams_for_node;
+    std::shared_ptr<LoadStreamMap> _load_stream_map;
 
     size_t _stream_index = 0;
     std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp 
b/be/test/vec/exec/load_stream_stub_map_test.cpp
similarity index 88%
rename from be/test/vec/exec/load_stream_stub_pool_test.cpp
rename to be/test/vec/exec/load_stream_stub_map_test.cpp
index e576db3bdaa..5f8743340ac 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_map_test.cpp
@@ -14,22 +14,21 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#include "vec/sink/load_stream_stub_pool.h"
-
 #include <gtest/gtest.h>
 
+#include "vec/sink/load_stream_map_pool.h"
 #include "vec/sink/load_stream_stub.h"
 
 namespace doris {
 
-class LoadStreamStubPoolTest : public testing::Test {
+class LoadStreamMapPoolTest : public testing::Test {
 public:
-    LoadStreamStubPoolTest() = default;
-    virtual ~LoadStreamStubPoolTest() = default;
+    LoadStreamMapPoolTest() = default;
+    virtual ~LoadStreamMapPoolTest() = default;
 };
 
-TEST_F(LoadStreamStubPoolTest, test) {
-    LoadStreamStubPool pool;
+TEST_F(LoadStreamMapPoolTest, test) {
+    LoadStreamMapPool pool;
     int64_t src_id = 100;
     PUniqueId load_id;
     load_id.set_lo(1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to