This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 605aa8d9240 [fix](move-memtable) fix timeout to get tablet schema
(#33256)
605aa8d9240 is described below
commit 605aa8d92409d8830e38754e5b7787a63f1a69a8
Author: Xin Liao <[email protected]>
AuthorDate: Thu Apr 4 21:25:14 2024 +0800
[fix](move-memtable) fix timeout to get tablet schema (#33256)
---
be/src/runtime/exec_env.cpp | 2 +-
be/src/runtime/exec_env.h | 6 +--
be/src/runtime/exec_env_init.cpp | 6 +--
...ream_stub_pool.cpp => load_stream_map_pool.cpp} | 24 +++++-----
...d_stream_stub_pool.h => load_stream_map_pool.h} | 15 +++---
be/src/vec/sink/load_stream_stub.cpp | 2 +-
be/src/vec/sink/volap_table_sink_v2.cpp | 2 -
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 55 +++++++++++-----------
be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +-
...pool_test.cpp => load_stream_stub_map_test.cpp} | 13 +++--
10 files changed, 63 insertions(+), 64 deletions(-)
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 8806ff2900f..306ad94099f 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -34,7 +34,7 @@
#include "util/debug_util.h"
#include "util/time.h"
#include "vec/sink/delta_writer_v2_pool.h"
-#include "vec/sink/load_stream_stub_pool.h"
+#include "vec/sink/load_stream_map_pool.h"
namespace doris {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 1ba35d6b200..e24727d845c 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -82,7 +82,7 @@ class RuntimeQueryStatiticsMgr;
class TMasterInfo;
class LoadChannelMgr;
class LoadStreamMgr;
-class LoadStreamStubPool;
+class LoadStreamMapPool;
class StreamLoadExecutor;
class RoutineLoadTaskExecutor;
class SmallFileMgr;
@@ -264,7 +264,7 @@ public:
}
#endif
- LoadStreamStubPool* load_stream_stub_pool() { return
_load_stream_stub_pool.get(); }
+ LoadStreamMapPool* load_stream_map_pool() { return
_load_stream_map_pool.get(); }
vectorized::DeltaWriterV2Pool* delta_writer_v2_pool() { return
_delta_writer_v2_pool.get(); }
@@ -393,7 +393,7 @@ private:
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
- std::unique_ptr<LoadStreamStubPool> _load_stream_stub_pool;
+ std::unique_ptr<LoadStreamMapPool> _load_stream_map_pool;
std::unique_ptr<vectorized::DeltaWriterV2Pool> _delta_writer_v2_pool;
std::shared_ptr<WalManager> _wal_manager;
DNSCache* _dns_cache = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index c44605dc845..a8b2131d883 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -104,7 +104,7 @@
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/sink/delta_writer_v2_pool.h"
-#include "vec/sink/load_stream_stub_pool.h"
+#include "vec/sink/load_stream_map_pool.h"
#include "vec/spill/spill_stream_manager.h"
#if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) &&
!defined(LEAK_SANITIZER) && \
@@ -245,7 +245,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_block_spill_mgr = new BlockSpillManager(store_paths);
_group_commit_mgr = new GroupCommitMgr(this);
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
- _load_stream_stub_pool = std::make_unique<LoadStreamStubPool>();
+ _load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
_delta_writer_v2_pool = std::make_unique<vectorized::DeltaWriterV2Pool>();
_file_cache_open_fd_cache = std::make_unique<io::FDCache>();
_wal_manager = WalManager::create_shared(this,
config::group_commit_wal_path);
@@ -595,7 +595,7 @@ void ExecEnv::destroy() {
_stream_load_executor.reset();
_memtable_memory_limiter.reset();
_delta_writer_v2_pool.reset();
- _load_stream_stub_pool.reset();
+ _load_stream_map_pool.reset();
_file_cache_open_fd_cache.reset();
// StorageEngine must be destoried before _page_no_cache_mem_tracker.reset
and _cache_manager destory
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp
b/be/src/vec/sink/load_stream_map_pool.cpp
similarity index 85%
rename from be/src/vec/sink/load_stream_stub_pool.cpp
rename to be/src/vec/sink/load_stream_map_pool.cpp
index 3eae49aff77..f335f05e162 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -15,21 +15,22 @@
// specific language governing permissions and limitations
// under the License.
-#include "vec/sink/load_stream_stub_pool.h"
+#include "vec/sink/load_stream_map_pool.h"
#include "util/debug_points.h"
-#include "vec/sink/load_stream_stub.h"
namespace doris {
class TExpr;
LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int
num_streams, int num_use,
- LoadStreamStubPool* pool)
+ LoadStreamMapPool* pool)
: _load_id(load_id),
_src_id(src_id),
_num_streams(num_streams),
_use_cnt(num_use),
- _pool(pool) {
+ _pool(pool),
+ _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
+ _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
DCHECK(num_streams > 0) << "stream num should be greater than 0";
DCHECK(num_use > 0) << "use num should be greater than 0";
}
@@ -41,10 +42,9 @@ std::shared_ptr<Streams>
LoadStreamMap::get_or_create(int64_t dst_id) {
return streams;
}
streams = std::make_shared<Streams>();
- auto schema_map = std::make_shared<IndexToTabletSchema>();
- auto mow_map = std::make_shared<IndexToEnableMoW>();
for (int i = 0; i < _num_streams; i++) {
- streams->emplace_back(new LoadStreamStub(_load_id, _src_id,
schema_map, mow_map));
+ streams->emplace_back(new LoadStreamStub(_load_id, _src_id,
_tablet_schema_for_index,
+
_enable_unique_mow_for_index));
}
_streams_for_node[dst_id] = streams;
return streams;
@@ -103,11 +103,11 @@ Status LoadStreamMap::close_load() {
});
}
-LoadStreamStubPool::LoadStreamStubPool() = default;
+LoadStreamMapPool::LoadStreamMapPool() = default;
-LoadStreamStubPool::~LoadStreamStubPool() = default;
-std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId
load_id, int64_t src_id,
- int
num_streams, int num_use) {
+LoadStreamMapPool::~LoadStreamMapPool() = default;
+std::shared_ptr<LoadStreamMap> LoadStreamMapPool::get_or_create(UniqueId
load_id, int64_t src_id,
+ int
num_streams, int num_use) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<LoadStreamMap> streams = _pool[load_id];
if (streams != nullptr) {
@@ -118,7 +118,7 @@ std::shared_ptr<LoadStreamMap>
LoadStreamStubPool::get_or_create(UniqueId load_i
return streams;
}
-void LoadStreamStubPool::erase(UniqueId load_id) {
+void LoadStreamMapPool::erase(UniqueId load_id) {
std::lock_guard<std::mutex> lock(_mutex);
_pool.erase(load_id);
}
diff --git a/be/src/vec/sink/load_stream_stub_pool.h
b/be/src/vec/sink/load_stream_map_pool.h
similarity index 91%
rename from be/src/vec/sink/load_stream_stub_pool.h
rename to be/src/vec/sink/load_stream_map_pool.h
index 65f3bb66cd2..aad12dba2aa 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -63,19 +63,20 @@
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
+#include "vec/sink/load_stream_stub.h"
namespace doris {
class LoadStreamStub;
-class LoadStreamStubPool;
+class LoadStreamMapPool;
using Streams = std::vector<std::shared_ptr<LoadStreamStub>>;
class LoadStreamMap {
public:
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int
num_use,
- LoadStreamStubPool* pool);
+ LoadStreamMapPool* pool);
std::shared_ptr<Streams> get_or_create(int64_t dst_id);
@@ -103,17 +104,19 @@ private:
std::atomic<int> _use_cnt;
std::mutex _mutex;
std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node;
- LoadStreamStubPool* _pool = nullptr;
+ LoadStreamMapPool* _pool = nullptr;
+ std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
+ std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
std::mutex _tablets_to_commit_mutex;
std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit;
};
-class LoadStreamStubPool {
+class LoadStreamMapPool {
public:
- LoadStreamStubPool();
+ LoadStreamMapPool();
- ~LoadStreamStubPool();
+ ~LoadStreamMapPool();
std::shared_ptr<LoadStreamMap> get_or_create(UniqueId load_id, int64_t
src_id, int num_streams,
int num_use);
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 2e118dce5c1..b4a9ac18643 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -250,7 +250,7 @@ Status LoadStreamStub::get_schema(const
std::vector<PTabletID>& tablets) {
PStreamHeader header;
*header.mutable_load_id() = _load_id;
header.set_src_id(_src_id);
- header.set_opcode(doris::PStreamHeader::CLOSE_LOAD);
+ header.set_opcode(doris::PStreamHeader::GET_SCHEMA);
std::ostringstream oss;
oss << "fetching tablet schema from stream " << _stream_id
<< ", load id: " << print_id(_load_id) << ", tablet id:";
diff --git a/be/src/vec/sink/volap_table_sink_v2.cpp
b/be/src/vec/sink/volap_table_sink_v2.cpp
index a41b77ace53..a73ee483bd3 100644
--- a/be/src/vec/sink/volap_table_sink_v2.cpp
+++ b/be/src/vec/sink/volap_table_sink_v2.cpp
@@ -31,8 +31,6 @@
#include "runtime/runtime_state.h"
#include "util/doris_metrics.h"
#include "vec/sink/delta_writer_v2_pool.h"
-#include "vec/sink/load_stream_stub.h"
-#include "vec/sink/load_stream_stub_pool.h"
namespace doris {
class TExpr;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 42594f13d99..bbe00b589ee 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -48,8 +48,8 @@
#include "vec/core/block.h"
#include "vec/sink/delta_writer_v2_pool.h"
// NOLINTNEXTLINE(unused-includes)
+#include "vec/sink/load_stream_map_pool.h"
#include "vec/sink/load_stream_stub.h" // IWYU pragma: keep
-#include "vec/sink/load_stream_stub_pool.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
@@ -96,7 +96,7 @@ Status VTabletWriterV2::_incremental_open_streams(
tablet.set_partition_id(partition->id);
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
- if (!_streams_for_node->contains(node)) {
+ if (!_load_stream_map->contains(node)) {
new_backends.insert(node);
}
_tablets_for_node[node].emplace(tablet_id, tablet);
@@ -112,7 +112,7 @@ Status VTabletWriterV2::_incremental_open_streams(
}
}
for (int64_t dst_id : new_backends) {
- auto streams = _streams_for_node->get_or_create(dst_id);
+ auto streams = _load_stream_map->get_or_create(dst_id);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
}
return Status::OK();
@@ -242,7 +242,7 @@ Status VTabletWriterV2::_init(RuntimeState* state,
RuntimeProfile* profile) {
} else {
_delta_writer_for_tablet =
std::make_shared<DeltaWriterV2Map>(_load_id);
}
- _streams_for_node =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+ _load_stream_map =
ExecEnv::GetInstance()->load_stream_map_pool()->get_or_create(
_load_id, _backend_id, _stream_per_node, _num_local_sink);
return Status::OK();
}
@@ -263,7 +263,7 @@ Status VTabletWriterV2::open(RuntimeState* state,
RuntimeProfile* profile) {
Status VTabletWriterV2::_open_streams() {
for (auto& [dst_id, _] : _tablets_for_node) {
- auto streams = _streams_for_node->get_or_create(dst_id);
+ auto streams = _load_stream_map->get_or_create(dst_id);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
}
return Status::OK();
@@ -361,7 +361,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id,
int64_t partition_id,
tablet.set_tablet_id(tablet_id);
VLOG_DEBUG << fmt::format("_select_streams P{} I{} T{}", partition_id,
index_id, tablet_id);
_tablets_for_node[node_id].emplace(tablet_id, tablet);
-
streams.emplace_back(_streams_for_node->at(node_id)->at(_stream_index));
+ streams.emplace_back(_load_stream_map->at(node_id)->at(_stream_index));
RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id,
tablet_id));
}
_stream_index = (_stream_index + 1) % _stream_per_node;
@@ -470,13 +470,13 @@ Status VTabletWriterV2::_cancel(Status status) {
_delta_writer_for_tablet->cancel(status);
_delta_writer_for_tablet.reset();
}
- if (_streams_for_node) {
- _streams_for_node->for_each([status](int64_t dst_id, const Streams&
streams) {
+ if (_load_stream_map) {
+ _load_stream_map->for_each([status](int64_t dst_id, const Streams&
streams) {
for (auto& stream : streams) {
stream->cancel(status);
}
});
- _streams_for_node->release();
+ _load_stream_map->release();
}
return Status::OK();
}
@@ -542,29 +542,28 @@ Status VTabletWriterV2::close(Status exec_status) {
}
_calc_tablets_to_commit();
- const bool is_last_sink = _streams_for_node->release();
+ const bool is_last_sink = _load_stream_map->release();
LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" <<
is_last_sink
<< ", load_id=" << print_id(_load_id);
// send CLOSE_LOAD and close_wait on all streams
if (is_last_sink) {
- RETURN_IF_ERROR(_streams_for_node->close_load());
+ RETURN_IF_ERROR(_load_stream_map->close_load());
SCOPED_TIMER(_close_load_timer);
- RETURN_IF_ERROR(_streams_for_node->for_each_st(
- [this](int64_t dst_id, const Streams& streams) -> Status {
- for (auto& stream : streams) {
- int64_t remain_ms =
-
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
- _timeout_watch.elapsed_time() / 1000 /
1000;
- if (remain_ms <= 0) {
- LOG(WARNING) << "load timed out before close
waiting, load_id="
- << print_id(_load_id);
- return Status::TimedOut("load timed out before
close waiting");
- }
- RETURN_IF_ERROR(stream->close_wait(_state,
remain_ms));
- }
- return Status::OK();
- }));
+ RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t
dst_id,
+ const
Streams& streams) -> Status {
+ for (auto& stream : streams) {
+ int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+ _timeout_watch.elapsed_time() / 1000 /
1000;
+ if (remain_ms <= 0) {
+ LOG(WARNING) << "load timed out before close waiting,
load_id="
+ << print_id(_load_id);
+ return Status::TimedOut("load timed out before close
waiting");
+ }
+ RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
+ }
+ return Status::OK();
+ }));
}
// calculate and submit commit info
@@ -573,7 +572,7 @@ Status VTabletWriterV2::close(Status exec_status) {
std::unordered_map<int64_t, Status> failed_reason;
std::vector<TTabletCommitInfo> tablet_commit_infos;
- _streams_for_node->for_each([&](int64_t dst_id, const Streams&
streams) {
+ _load_stream_map->for_each([&](int64_t dst_id, const Streams&
streams) {
std::unordered_set<int64_t> known_tablets;
for (const auto& stream : streams) {
for (auto [tablet_id, reason] : stream->failed_tablets()) {
@@ -648,7 +647,7 @@ void VTabletWriterV2::_calc_tablets_to_commit() {
}
LOG(WARNING) << msg;
}
- _streams_for_node->save_tablets_to_commit(dst_id, tablets_to_commit);
+ _load_stream_map->save_tablets_to_commit(dst_id, tablets_to_commit);
}
}
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 7785733bf4a..c04cff15cf4 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -217,7 +217,7 @@ private:
std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>>
_tablets_for_node;
std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node;
- std::shared_ptr<LoadStreamMap> _streams_for_node;
+ std::shared_ptr<LoadStreamMap> _load_stream_map;
size_t _stream_index = 0;
std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet;
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp
b/be/test/vec/exec/load_stream_stub_map_test.cpp
similarity index 88%
rename from be/test/vec/exec/load_stream_stub_pool_test.cpp
rename to be/test/vec/exec/load_stream_stub_map_test.cpp
index e576db3bdaa..5f8743340ac 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_map_test.cpp
@@ -14,22 +14,21 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-#include "vec/sink/load_stream_stub_pool.h"
-
#include <gtest/gtest.h>
+#include "vec/sink/load_stream_map_pool.h"
#include "vec/sink/load_stream_stub.h"
namespace doris {
-class LoadStreamStubPoolTest : public testing::Test {
+class LoadStreamMapPoolTest : public testing::Test {
public:
- LoadStreamStubPoolTest() = default;
- virtual ~LoadStreamStubPoolTest() = default;
+ LoadStreamMapPoolTest() = default;
+ virtual ~LoadStreamMapPoolTest() = default;
};
-TEST_F(LoadStreamStubPoolTest, test) {
- LoadStreamStubPool pool;
+TEST_F(LoadStreamMapPoolTest, test) {
+ LoadStreamMapPool pool;
int64_t src_id = 100;
PUniqueId load_id;
load_id.set_lo(1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]