This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 70f2ac308a [fix](sink) fix OlapTableSink early close causes load
failure #21545
70f2ac308a is described below
commit 70f2ac308a46978ba16f9b25f1b1544300d2e208
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Jul 7 14:03:54 2023 +0800
[fix](sink) fix OlapTableSink early close causes load failure #21545
---
be/src/vec/sink/vtablet_sink.cpp | 73 ++++++++++++++++++++++++++++------------
be/src/vec/sink/vtablet_sink.h | 5 ++-
2 files changed, 56 insertions(+), 22 deletions(-)
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index c54666d6d6..cd8716abcb 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -109,7 +109,7 @@ public:
void Run() override {
SCOPED_TRACK_MEMORY_TO_UNKNOWN();
- if (cntl.Failed()) {
+ auto open_partition_failed = [this]() {
std::stringstream ss;
ss << "failed to open partition, error=" <<
berror(this->cntl.ErrorCode())
<< ", error_text=" << this->cntl.ErrorText();
@@ -119,6 +119,14 @@ public:
fmt::format("{}, open failed, err:
{}",
vnode_channel->channel_info(), ss.str()),
-1);
+ };
+ if (cntl.Failed()) {
+ open_partition_failed();
+ } else {
+ Status status(result.status());
+ if (!status.ok()) {
+ open_partition_failed();
+ }
}
done = true;
}
@@ -905,14 +913,12 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
request.release_id();
}
-bool VNodeChannel::is_rpc_done() const {
+bool VNodeChannel::is_send_data_rpc_done() const {
if (_add_block_closure != nullptr) {
- return (_add_batches_finished ||
- (_cancelled && !_add_block_closure->is_packet_in_flight())) &&
- open_partition_finished();
+ return _add_batches_finished || (_cancelled &&
!_add_block_closure->is_packet_in_flight());
} else {
// such as, canceled before open_wait new closure.
- return (_add_batches_finished || _cancelled) &&
open_partition_finished();
+ return _add_batches_finished || _cancelled;
}
}
@@ -1127,6 +1133,7 @@ Status VOlapTableSink::prepare(RuntimeState* state) {
}
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_input_row_desc));
+ _prepare = true;
return Status::OK();
}
@@ -1440,6 +1447,22 @@ void VOlapTableSink::try_close(RuntimeState* state,
Status exec_status) {
if (_try_close) {
return;
}
+
+ if (config::enable_lazy_open_partition && !_open_partition_done) {
+ // open_partition_finished must be before mark_close
+ bool open_partition_done = true;
+ for (const auto& index_channel : _channels) {
+ index_channel->for_each_node_channel(
+ [&open_partition_done](const
std::shared_ptr<VNodeChannel>& ch) {
+ open_partition_done &= ch->open_partition_finished();
+ });
+ }
+ if (!open_partition_done) {
+ return;
+ }
+ _open_partition_done = true;
+ }
+
SCOPED_TIMER(_close_timer);
Status status = exec_status;
if (status.ok()) {
@@ -1475,11 +1498,14 @@ void VOlapTableSink::try_close(RuntimeState* state,
Status exec_status) {
}
bool VOlapTableSink::is_close_done() {
+ if (config::enable_lazy_open_partition && !_open_partition_done) {
+ return false;
+ }
bool close_done = true;
for (const auto& index_channel : _channels) {
index_channel->for_each_node_channel(
[&close_done](const std::shared_ptr<VNodeChannel>& ch) {
- close_done &= ch->is_rpc_done();
+ close_done &= ch->is_send_data_rpc_done();
});
}
return close_done;
@@ -1489,15 +1515,29 @@ Status VOlapTableSink::close(RuntimeState* state,
Status exec_status) {
if (_closed) {
return _close_status;
}
- try_close(state, exec_status);
+ if (!_prepare) {
+ DCHECK(!exec_status.ok());
+ _cancel_all_channel(exec_status);
+ DataSink::close(state, exec_status);
+ _close_status = exec_status;
+ return _close_status;
+ }
+
SCOPED_TIMER(_close_timer);
+ SCOPED_TIMER(_profile->total_time_counter());
+
+ if (config::enable_lazy_open_partition) {
+ for (const auto& index_channel : _channels) {
+ index_channel->for_each_node_channel(
+ [](const std::shared_ptr<VNodeChannel>& ch) {
ch->open_partition_wait(); });
+ }
+ _open_partition_done = true;
+ }
+
+ try_close(state, exec_status);
// If _close_status is not ok, all nodes have been canceled in try_close.
if (_close_status.ok()) {
- DCHECK(exec_status.ok());
auto status = Status::OK();
- // only if status is ok can we call this
_profile->total_time_counter().
- // if status is not ok, this sink may not be prepared, so that
_profile is null
- SCOPED_TIMER(_profile->total_time_counter());
// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter>
node_add_batch_counter_map;
int64_t serialize_batch_ns = 0, queue_push_lock_ns = 0,
actual_consume_ns = 0,
@@ -1506,15 +1546,6 @@ Status VOlapTableSink::close(RuntimeState* state, Status
exec_status) {
num_node_channels = 0;
VNodeChannelStat channel_stat;
{
- if (config::enable_lazy_open_partition) {
- for (const auto& index_channel : _channels) {
- index_channel->for_each_node_channel(
- [](const std::shared_ptr<VNodeChannel>& ch) {
- ch->open_partition_wait();
- });
- }
- }
-
for (const auto& index_channel : _channels) {
if (!status.ok()) {
break;
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index aeec8d1047..ea6b857b8b 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -251,7 +251,7 @@ public:
// 2. just cancel()
void mark_close();
- bool is_rpc_done() const;
+ bool is_send_data_rpc_done() const;
bool is_closed() const { return _is_closed; }
bool is_cancelled() const { return _cancelled; }
@@ -606,6 +606,9 @@ private:
// Save the status of try_close() and close() method
Status _close_status;
bool _try_close = false;
+ bool _prepare = false;
+
+ std::atomic<bool> _open_partition_done {false};
// User can change this config at runtime, avoid it being modified during
query or loading process.
bool _transfer_large_data_by_brpc = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]