This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fe111207a9 [Fix](lazy_open) Fix lazy open null point (#19829)
fe111207a9 is described below
commit fe111207a96a49d96ded1adff937c154e924fdd3
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Tue May 23 09:17:46 2023 +0800
[Fix](lazy_open) Fix lazy open null point (#19829)
---
be/src/runtime/load_channel.cpp | 22 +++++++++++++++-------
be/src/vec/sink/vtablet_sink.cpp | 6 ++++++
2 files changed, 21 insertions(+), 7 deletions(-)
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index e26564962a..f0ed39d7e0 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -87,6 +87,15 @@ Status LoadChannel::open(const PTabletWriterOpenRequest&
params) {
Status LoadChannel::open_partition(const OpenPartitionRequest& params) {
int64_t index_id = params.index_id();
+
+ // check finish
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ auto it = _finished_channel_ids.find(index_id);
+ if (it != _finished_channel_ids.end()) {
+ return Status::OK();
+ }
+ }
std::shared_ptr<TabletsChannel> channel;
{
std::lock_guard<std::mutex> l(_lock);
@@ -94,14 +103,13 @@ Status LoadChannel::open_partition(const
OpenPartitionRequest& params) {
if (it != _tablets_channels.end()) {
channel = it->second;
} else {
- // create a new tablets channel
- TabletsChannelKey key(params.id(), index_id);
- channel = std::make_shared<TabletsChannel>(key, _load_id,
_is_high_priority,
- _self_profile);
- {
- std::lock_guard<SpinLock> l(_tablets_channels_lock);
- _tablets_channels.insert({index_id, channel});
+ fmt::memory_buffer buf;
+ for (auto tablet : params.tablets()) {
+ fmt::format_to(buf, "tablet id:{}", tablet.tablet_id());
}
+ LOG(WARNING) << "should be opened partition index id=" <<
params.index_id()
+ << "tablet ids=" << fmt::to_string(buf);
+ return Status::InternalError("Partition should be opened");
}
}
RETURN_IF_ERROR(channel->open_all_writers_for_partition(params));
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index b8c796c964..ecdefe54d0 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -1107,10 +1107,13 @@ Status VOlapTableSink::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+ fmt::memory_buffer buf;
for (auto index_channel : _channels) {
+ fmt::format_to(buf, "index id:{}", index_channel->_index_id);
index_channel->for_each_node_channel(
[](const std::shared_ptr<VNodeChannel>& ch) { ch->open(); });
}
+ LOG(INFO) << "list of open index id = " << fmt::to_string(buf);
for (auto index_channel : _channels) {
index_channel->for_each_node_channel([&index_channel](
@@ -1145,7 +1148,9 @@ void VOlapTableSink::_open_partition(const
VOlapTablePartition* partition) {
auto it = _opened_partitions.find(id);
if (it == _opened_partitions.end()) {
_opened_partitions.insert(id);
+ fmt::memory_buffer buf;
for (int j = 0; j < partition->indexes.size(); ++j) {
+ fmt::format_to(buf, "index id:{}", partition->indexes[j].index_id);
for (const auto& tid : partition->indexes[j].tablets) {
auto it = _channels[j]->_channels_by_tablet.find(tid);
for (const auto& channel : it->second) {
@@ -1153,6 +1158,7 @@ void VOlapTableSink::_open_partition(const
VOlapTablePartition* partition) {
}
}
}
+ LOG(INFO) << "list of lazy open index id = " << fmt::to_string(buf);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]