liaoxin01 commented on code in PR #57767:
URL: https://github.com/apache/doris/pull/57767#discussion_r2550028166
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -367,6 +367,18 @@ Status FragmentMgr::trigger_pipeline_context_report(
// Also, the reported status will always reflect the most recent execution
status,
// including the final status when execution finishes.
void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
+ LOG(INFO) << fmt::format(
Review Comment:
No need to print this log.
##########
be/src/runtime/load_channel_mgr.cpp:
##########
@@ -117,15 +117,36 @@ Status
LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& channel,
std::lock_guard<std::mutex> l(_lock);
auto it = _load_channels.find(load_id);
if (it == _load_channels.end()) {
- auto* handle = _last_success_channels->lookup(load_id.to_string());
- // success only when eos be true
+ Cache::Handle* handle =
_load_state_channels->lookup(load_id.to_string());
if (handle != nullptr) {
- _last_success_channels->release(handle);
- if (request.has_eos() && request.eos()) {
- is_eof = true;
- return Status::OK();
+ // load is cancelled
+ if (auto* value = _load_state_channels->value(handle); value !=
nullptr) {
+ const auto& cancel_reason =
reinterpret_cast<CacheValue*>(value)->_cancel_reason;
+ _load_state_channels->release(handle);
+ if (!cancel_reason.empty()) {
+ LOG(INFO) << fmt::format(
+ "The channel has been cancelled, load_id = {},
error = {}",
+ print_id(load_id), cancel_reason);
+ return Status::Cancelled(
Review Comment:
```suggestion
return Status::Cancelled(cancel_reason);
```
##########
be/src/runtime/load_channel_mgr.cpp:
##########
@@ -117,15 +117,36 @@ Status
LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& channel,
std::lock_guard<std::mutex> l(_lock);
auto it = _load_channels.find(load_id);
if (it == _load_channels.end()) {
- auto* handle = _last_success_channels->lookup(load_id.to_string());
- // success only when eos be true
+ Cache::Handle* handle =
_load_state_channels->lookup(load_id.to_string());
if (handle != nullptr) {
- _last_success_channels->release(handle);
- if (request.has_eos() && request.eos()) {
- is_eof = true;
- return Status::OK();
+ // load is cancelled
+ if (auto* value = _load_state_channels->value(handle); value !=
nullptr) {
+ const auto& cancel_reason =
reinterpret_cast<CacheValue*>(value)->_cancel_reason;
+ _load_state_channels->release(handle);
+ if (!cancel_reason.empty()) {
+ LOG(INFO) << fmt::format(
+ "The channel has been cancelled, load_id = {},
error = {}",
+ print_id(load_id), cancel_reason);
+ return Status::Cancelled(
+ "Load channel has been cancelled previously: {},
reason: {}",
+ load_id.to_string(), cancel_reason);
+ } else {
Review Comment:
if cancel_reason is empty, the load should be successful?
##########
be/src/runtime/load_channel_mgr.cpp:
##########
@@ -193,10 +214,31 @@ Status LoadChannelMgr::cancel(const
PTabletWriterCancelRequest& params) {
std::shared_ptr<LoadChannel> cancelled_channel;
{
std::lock_guard<std::mutex> l(_lock);
- if (_load_channels.find(load_id) != _load_channels.end()) {
+ if (_load_channels.contains(load_id)) {
cancelled_channel = _load_channels[load_id];
_load_channels.erase(load_id);
}
+ // We just need to record the first cancel msg
+ auto* existing_handle =
_load_state_channels->lookup(load_id.to_string());
+ if (existing_handle == nullptr) {
+ if (params.has_cancel_reason() && !params.cancel_reason().empty())
{
+ std::unique_ptr<CacheValue> cancel_reason_ptr =
std::make_unique<CacheValue>();
+ cancel_reason_ptr->_cancel_reason = params.cancel_reason();
+ size_t cache_capacity =
cancel_reason_ptr->_cancel_reason.capacity();
+ auto* handle = _load_state_channels->insert(
+ load_id.to_string(), cancel_reason_ptr.get(), 1,
cache_capacity);
+ cancel_reason_ptr.release();
+ _load_state_channels->release(handle);
+ LOG(INFO) << fmt::format("load_id = {}, record_error reason =
{}",
+ print_id(load_id),
params.cancel_reason());
+ } else {
+ return Status::RpcError(
Review Comment:
We shouldn’t return this error; a missing cancel reason may occur when
upgrading from a lower version.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]