Copilot commented on code in PR #59428:
URL: https://github.com/apache/doris/pull/59428#discussion_r2649600494
##########
be/src/http/action/stream_load.cpp:
##########
@@ -398,6 +436,23 @@ void
StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
if (ctx == nullptr) {
return;
}
+
+ // Mark request as closed to prevent callback from accessing invalid
pointers
+ ctx->http_request = nullptr;
+ ctx->stream_load_action = nullptr;
+ ctx->event_base = nullptr;
+
+ // If promise hasn't been set, set it to avoid other code waiting forever
+ try {
+ auto future_status =
ctx->future.wait_for(std::chrono::milliseconds(0));
+ if (future_status == std::future_status::timeout) {
+ // promise hasn't been set, set a cancelled status
+ ctx->promise.set_value(Status::Cancelled("Request closed by
client"));
+ }
+ } catch (...) {
+ // promise already set, ignore
Review Comment:
Exception handling issue: The try-catch block catches all exceptions with
catch(...) but doesn't log what exception was caught or why. This makes
debugging difficult if an unexpected exception occurs. At minimum, this should
log that an exception was caught, or better yet, only catch the specific
exception type expected from set_value when the promise is already satisfied.
```suggestion
} catch (const std::future_error& e) {
// Promise may already be satisfied or in an invalid state; ignore
this benign race.
VLOG_ROW << "Ignoring std::future_error while setting cancel status
in free_handler_ctx: "
<< e.what();
} catch (const std::exception& e) {
// Log unexpected standard exceptions to aid debugging but do not
let them escape.
LOG(WARNING) << "Unexpected exception while setting cancel status in
free_handler_ctx: "
<< e.what();
} catch (...) {
// Log unknown non-standard exceptions.
LOG(WARNING) << "Unknown non-standard exception while setting cancel
status in "
"free_handler_ctx.";
```
##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -123,26 +161,75 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
}
}
ctx->write_data_cost_nanos = MonotonicNanos() -
ctx->start_write_data_nanos;
- ctx->promise.set_value(*status);
- if (!status->ok() && ctx->body_sink != nullptr) {
- // In some cases, the load execution is exited early.
- // For example, when max_filter_ratio is 0 and illegal data is
encountered
- // during stream loading, the entire load process is terminated
early.
- // However, the http connection may still be sending data to
stream_load_pipe
- // and waiting for it to be consumed.
- // Therefore, we need to actively cancel to end the pipe.
- ctx->body_sink->cancel(status->to_string());
+ // If event_base is set, use async callback; otherwise use sync mode
+ if (ctx->event_base != nullptr && ctx->http_request != nullptr &&
+ ctx->stream_load_action != nullptr) {
+ // Async mode: use event_base_once to trigger callback in libevent
thread
+ // Note: exec_fragment callback should only be called once (after
fragment execution)
+ // If it's called multiple times, there's a bug that should be
fixed
+
+ // Save state for callback (value copy to ensure thread safety)
+ Status fragment_status = *status;
+
+ // Save other state needed in callback
+ bool need_rollback = ctx->need_rollback;
+ bool need_commit_self = ctx->need_commit_self;
+ bool body_sink_cancelled = (ctx->body_sink != nullptr &&
ctx->body_sink->cancelled());
+
+ // Allocate callback data on heap since event_base_once may not
execute immediately
+ auto* callback_data =
+ new StreamLoadAsyncCallbackData {.ctx = ctx,
+ .fragment_status =
fragment_status,
+ .need_rollback =
need_rollback,
+ .need_commit_self =
need_commit_self,
+ .body_sink_cancelled =
body_sink_cancelled};
+
+ // Use event_base_once to trigger callback in libevent thread
+ // This is a one-shot event, no timer needed
+ struct timeval tv {};
+ tv.tv_sec = 0;
+ tv.tv_usec = 0; // Trigger immediately
+ int ret = event_base_once(ctx->event_base, -1, EV_TIMEOUT,
stream_load_async_callback,
+ callback_data, &tv);
+ if (ret != 0) {
+ // event_base_once failed, log error
+ // Note: In async mode, no code is waiting for promise
(_handle() has returned),
+ // so no need to set promise
+ LOG(ERROR) << "event_base_once failed, cannot send async
callback, ctx="
+ << ctx->id.to_string() << ", errno=" << errno;
Review Comment:
Memory leak: If event_base_once fails (line 195), the callback_data
allocated on line 181-186 is never deleted, causing a memory leak. The
callback_data should be deleted in the error path.
```suggestion
<< ctx->id.to_string() << ", errno=" << errno;
// Clean up callback_data since callback will never be
invoked on failure
delete callback_data;
```
##########
be/src/http/action/stream_load.cpp:
##########
@@ -924,4 +979,110 @@ Status
StreamLoadAction::_handle_group_commit(HttpRequest* req,
return Status::OK();
}
+void
StreamLoadAction::continue_handle_after_future(std::shared_ptr<StreamLoadContext>
ctx,
+ Status fragment_status,
bool need_rollback,
+ bool need_commit_self,
+ bool body_sink_cancelled) {
+ // Handle body_sink cancel if needed
+ if (!fragment_status.ok() && ctx->body_sink != nullptr) {
+ ctx->body_sink->cancel(fragment_status.to_string());
+ }
+
+ // Handle need_commit_self case
+ if (need_commit_self && ctx->body_sink != nullptr) {
+ if (body_sink_cancelled || !fragment_status.ok()) {
+ ctx->status = fragment_status;
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ } else {
+ // Execute commit in callback (this was originally in
exec_fragment)
+
static_cast<void>(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ }
+ }
+
+ // Continue with original subsequent logic
+ if (!fragment_status.ok()) {
+ ctx->status = fragment_status;
+ if (need_rollback) {
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+ ctx->need_rollback = false;
+ }
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ }
+
+ if (ctx->group_commit) {
+ LOG(INFO) << "skip commit because this is group commit, pipe_id=" <<
ctx->id.to_string();
+ ctx->status = Status::OK();
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ }
+
+ if (ctx->two_phase_commit) {
+ int64_t pre_commit_start_time = MonotonicNanos();
+ Status commit_st =
_exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
+ ctx->pre_commit_txn_cost_nanos = MonotonicNanos() -
pre_commit_start_time;
+ ctx->status = commit_st;
+ } else {
+ int64_t commit_and_publish_start_time = MonotonicNanos();
+ Status commit_st =
_exec_env->stream_load_executor()->commit_txn(ctx.get());
+ ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() -
commit_and_publish_start_time;
+ g_stream_load_commit_and_publish_latency_ms
+ << ctx->commit_and_publish_txn_cost_nanos / 1000000;
+ ctx->status = commit_st;
+ }
+
+ _finalize_request(ctx->http_request, ctx);
+}
+
+void StreamLoadAction::_finalize_request(HttpRequest* req,
std::shared_ptr<StreamLoadContext> ctx) {
+ ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
+
+ if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+ if (ctx->need_rollback) {
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+ ctx->need_rollback = false;
+ }
Review Comment:
Duplicated rollback logic: The rollback logic appears in multiple places -
in continue_handle_after_future (lines 995, 1010) and in _finalize_request
(lines 1046). This duplication could lead to maintenance issues if the rollback
behavior needs to change. Additionally, there's a risk of double rollback if
both code paths execute, though the need_rollback flag should prevent this in
most cases.
```suggestion
auto rollback_if_needed = [this](const
std::shared_ptr<StreamLoadContext>& c) {
if (c->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(c.get());
c->need_rollback = false;
}
};
rollback_if_needed(ctx);
```
##########
be/src/http/action/stream_load.cpp:
##########
@@ -160,7 +170,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
}
}
-Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
+Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx,
HttpRequest* req) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes,
body_bytes=" << ctx->body_bytes
Review Comment:
Spelling error: "recevie" should be spelled "receive".
```suggestion
LOG(WARNING) << "receive body don't equal with body bytes,
body_bytes=" << ctx->body_bytes
```
##########
be/src/runtime/stream_load/stream_load_context.h:
##########
@@ -256,6 +261,13 @@ class StreamLoadContext {
std::string qualified_user;
std::string cloud_cluster;
+ // Fields for async processing (Scheme B: libevent deferred callback)
+ // These fields are set in _on_header() before execute_plan_fragment is
called
+ // to avoid race conditions
+ struct event_base* event_base = nullptr; // libevent event loop
+ HttpRequest* http_request = nullptr; // HTTP request reference
+ StreamLoadAction* stream_load_action = nullptr; // StreamLoadAction
instance pointer
Review Comment:
Raw pointer members without ownership semantics: The StreamLoadContext class
now contains raw pointers (event_base, http_request, stream_load_action) that
don't express ownership. These pointers are set to nullptr in free_handler_ctx
but there's no clear lifetime management. If the HttpRequest or
StreamLoadAction is destroyed while the async callback is pending or executing,
accessing these pointers will cause undefined behavior. Consider using weak_ptr
or another mechanism to safely check if these objects are still valid.
##########
be/src/http/action/stream_load.cpp:
##########
@@ -176,26 +186,38 @@ Status
StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx,
mocked));
}
- // wait stream load finish
- RETURN_IF_ERROR(ctx->future.get());
+ // Check if event_base is set (should be set in _on_header)
+ // If not set, fallback to sync wait
+ if (ctx->event_base == nullptr || ctx->http_request == nullptr ||
+ ctx->stream_load_action == nullptr) {
+ // event_base not set (should not happen), fallback to sync wait
+ LOG(WARNING) << "event_base not set, fallback to sync wait, ctx=" <<
ctx->id.to_string();
+ RETURN_IF_ERROR(ctx->future.get());
- if (ctx->group_commit) {
- LOG(INFO) << "skip commit because this is group commit, pipe_id=" <<
ctx->id.to_string();
+ // Continue with original logic
+ if (ctx->group_commit) {
+ LOG(INFO) << "skip commit because this is group commit, pipe_id="
+ << ctx->id.to_string();
+ return Status::OK();
+ }
+
+ if (ctx->two_phase_commit) {
+ int64_t pre_commit_start_time = MonotonicNanos();
+
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
+ ctx->pre_commit_txn_cost_nanos = MonotonicNanos() -
pre_commit_start_time;
+ } else {
+ int64_t commit_and_publish_start_time = MonotonicNanos();
+
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+ ctx->commit_and_publish_txn_cost_nanos =
+ MonotonicNanos() - commit_and_publish_start_time;
+ g_stream_load_commit_and_publish_latency_ms
+ << ctx->commit_and_publish_txn_cost_nanos / 1000000;
+ }
return Status::OK();
}
- if (ctx->two_phase_commit) {
- int64_t pre_commit_start_time = MonotonicNanos();
-
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
- ctx->pre_commit_txn_cost_nanos = MonotonicNanos() -
pre_commit_start_time;
- } else {
- // If put file success we need commit this load
- int64_t commit_and_publish_start_time = MonotonicNanos();
-
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
- ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() -
commit_and_publish_start_time;
- g_stream_load_commit_and_publish_latency_ms
- << ctx->commit_and_publish_txn_cost_nanos / 1000000;
- }
+ // event_base is set, use async mode, return directly
+ // Future completion will trigger callback in stream_load_executor
Review Comment:
Incorrect comment: The comment states "Future completion will trigger
callback in stream_load_executor" but this is misleading. The callback is not
triggered by future completion - it's triggered immediately by event_base_once
with a zero timeout. The future/promise mechanism is actually bypassed in async
mode, as noted in the code itself (lines 197-198, 202, 204).
```suggestion
// stream_load_executor will be invoked via event_base_once
(zero-timeout) callback
```
##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -123,26 +161,75 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
}
}
ctx->write_data_cost_nanos = MonotonicNanos() -
ctx->start_write_data_nanos;
- ctx->promise.set_value(*status);
- if (!status->ok() && ctx->body_sink != nullptr) {
- // In some cases, the load execution is exited early.
- // For example, when max_filter_ratio is 0 and illegal data is
encountered
- // during stream loading, the entire load process is terminated
early.
- // However, the http connection may still be sending data to
stream_load_pipe
- // and waiting for it to be consumed.
- // Therefore, we need to actively cancel to end the pipe.
- ctx->body_sink->cancel(status->to_string());
+ // If event_base is set, use async callback; otherwise use sync mode
+ if (ctx->event_base != nullptr && ctx->http_request != nullptr &&
+ ctx->stream_load_action != nullptr) {
+ // Async mode: use event_base_once to trigger callback in libevent
thread
+ // Note: exec_fragment callback should only be called once (after
fragment execution)
+ // If it's called multiple times, there's a bug that should be
fixed
+
+ // Save state for callback (value copy to ensure thread safety)
+ Status fragment_status = *status;
+
+ // Save other state needed in callback
+ bool need_rollback = ctx->need_rollback;
+ bool need_commit_self = ctx->need_commit_self;
+ bool body_sink_cancelled = (ctx->body_sink != nullptr &&
ctx->body_sink->cancelled());
+
+ // Allocate callback data on heap since event_base_once may not
execute immediately
+ auto* callback_data =
+ new StreamLoadAsyncCallbackData {.ctx = ctx,
+ .fragment_status =
fragment_status,
+ .need_rollback =
need_rollback,
+ .need_commit_self =
need_commit_self,
+ .body_sink_cancelled =
body_sink_cancelled};
Review Comment:
Unsafe shared_ptr usage across threads: The callback_data contains a
shared_ptr to StreamLoadContext (ctx). This shared_ptr is created in one thread
(fragment manager thread) and the callback_data is then passed to potentially
execute in another thread (libevent thread). While shared_ptr itself is
thread-safe for reference counting, the callback deletes the callback_data
(line 92) which will decrement the ref count. If this happens concurrently with
other operations on the shared_ptr in other threads without proper
synchronization, it could lead to issues. More critically, the ctx object
members (like http_request, stream_load_action) are being accessed and modified
across threads without synchronization.
##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -54,6 +57,41 @@
namespace doris {
using namespace ErrorCode;
+// Callback data structure for async stream load processing
+struct StreamLoadAsyncCallbackData {
+ std::shared_ptr<StreamLoadContext> ctx;
+ Status fragment_status;
+ bool need_rollback;
+ bool need_commit_self;
+ bool body_sink_cancelled;
+};
+
+// C-style callback function for event_base_once
+static void stream_load_async_callback(evutil_socket_t, short, void* arg) {
+ auto* data = static_cast<StreamLoadAsyncCallbackData*>(arg);
+ // This callback executes in libevent thread
+ // Check validity again (request may have been closed during wait)
+ if (data->ctx->stream_load_action == nullptr || data->ctx->http_request ==
nullptr) {
+ LOG(WARNING) << "stream_load_action or http_request is null in
callback, "
+ << "request may be closed, ctx=" <<
data->ctx->id.to_string();
+ // Note: No need to set promise here because:
+ // 1. If request is closed, free_handler_ctx should have set promise
(to Cancelled)
+ // 2. In async mode, _handle() has returned, won't wait for promise
+ // 3. If promise is already set, setting again will throw exception
+ // 4. Even if promise is not set, setting here is meaningless since
request is closed
+ delete data;
+ return;
+ }
+
+ // Continue with subsequent logic
+ data->ctx->stream_load_action->continue_handle_after_future(
+ data->ctx, data->fragment_status, data->need_rollback,
data->need_commit_self,
+ data->body_sink_cancelled);
Review Comment:
Race condition: The validity check of ctx->stream_load_action and
ctx->http_request happens in the callback (libevent thread), but these fields
can be set to nullptr in free_handler_ctx (lines 441-443 in stream_load.cpp)
which may run concurrently. Without synchronization, there's a
time-of-check-time-of-use (TOCTOU) race where the pointers could become null
between the check on line 74 and their use on lines 87-89.
##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -54,6 +57,41 @@
namespace doris {
using namespace ErrorCode;
+// Callback data structure for async stream load processing
+struct StreamLoadAsyncCallbackData {
+ std::shared_ptr<StreamLoadContext> ctx;
+ Status fragment_status;
+ bool need_rollback;
+ bool need_commit_self;
+ bool body_sink_cancelled;
+};
+
+// C-style callback function for event_base_once
+static void stream_load_async_callback(evutil_socket_t, short, void* arg) {
+ auto* data = static_cast<StreamLoadAsyncCallbackData*>(arg);
+ // This callback executes in libevent thread
+ // Check validity again (request may have been closed during wait)
+ if (data->ctx->stream_load_action == nullptr || data->ctx->http_request ==
nullptr) {
+ LOG(WARNING) << "stream_load_action or http_request is null in
callback, "
+ << "request may be closed, ctx=" <<
data->ctx->id.to_string();
+ // Note: No need to set promise here because:
+ // 1. If request is closed, free_handler_ctx should have set promise
(to Cancelled)
+ // 2. In async mode, _handle() has returned, won't wait for promise
+ // 3. If promise is already set, setting again will throw exception
+ // 4. Even if promise is not set, setting here is meaningless since
request is closed
Review Comment:
Unclear control flow comment: The comment states "No need to set promise
here because..." but then lists several conflicting reasons. Specifically,
point 2 says "_handle() has returned, won't wait for promise" but point 4 says
"Even if promise is not set, setting here is meaningless since request is
closed". This creates confusion about the actual contract - is the promise
required to be set or not? The comment should clearly state the expected
behavior.
```suggestion
// Note: Do not touch the associated promise here:
// 1. If the request has already been closed, free_handler_ctx is
responsible for
// fulfilling the promise (typically with a Cancelled status).
// 2. In async mode, _handle() has already returned and no caller is
waiting on this
// callback to satisfy the promise.
// 3. The promise may already have been set; attempting to set it
again would be unsafe
// and could throw an exception.
```
##########
be/src/http/action/stream_load.cpp:
##########
@@ -924,4 +979,110 @@ Status
StreamLoadAction::_handle_group_commit(HttpRequest* req,
return Status::OK();
}
+void
StreamLoadAction::continue_handle_after_future(std::shared_ptr<StreamLoadContext>
ctx,
+ Status fragment_status,
bool need_rollback,
+ bool need_commit_self,
+ bool body_sink_cancelled) {
+ // Handle body_sink cancel if needed
+ if (!fragment_status.ok() && ctx->body_sink != nullptr) {
+ ctx->body_sink->cancel(fragment_status.to_string());
+ }
+
+ // Handle need_commit_self case
+ if (need_commit_self && ctx->body_sink != nullptr) {
+ if (body_sink_cancelled || !fragment_status.ok()) {
+ ctx->status = fragment_status;
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ } else {
+ // Execute commit in callback (this was originally in
exec_fragment)
+
static_cast<void>(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ }
+ }
+
+ // Continue with original subsequent logic
+ if (!fragment_status.ok()) {
+ ctx->status = fragment_status;
+ if (need_rollback) {
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+ ctx->need_rollback = false;
+ }
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ }
+
+ if (ctx->group_commit) {
+ LOG(INFO) << "skip commit because this is group commit, pipe_id=" <<
ctx->id.to_string();
+ ctx->status = Status::OK();
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ }
+
+ if (ctx->two_phase_commit) {
+ int64_t pre_commit_start_time = MonotonicNanos();
+ Status commit_st =
_exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
+ ctx->pre_commit_txn_cost_nanos = MonotonicNanos() -
pre_commit_start_time;
+ ctx->status = commit_st;
+ } else {
+ int64_t commit_and_publish_start_time = MonotonicNanos();
+ Status commit_st =
_exec_env->stream_load_executor()->commit_txn(ctx.get());
+ ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() -
commit_and_publish_start_time;
+ g_stream_load_commit_and_publish_latency_ms
+ << ctx->commit_and_publish_txn_cost_nanos / 1000000;
+ ctx->status = commit_st;
+ }
+
+ _finalize_request(ctx->http_request, ctx);
+}
+
+void StreamLoadAction::_finalize_request(HttpRequest* req,
std::shared_ptr<StreamLoadContext> ctx) {
+ ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
+
+ if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+ if (ctx->need_rollback) {
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+ ctx->need_rollback = false;
+ }
+ if (ctx->body_sink != nullptr) {
+ ctx->body_sink->cancel(ctx->status.to_string());
+ }
+ }
+
+ auto str = ctx->to_json();
+ str = str + '\n';
+ HttpChannel::send_reply(req, str);
Review Comment:
Potential use-after-free: In _finalize_request, ctx->http_request is passed
as a parameter and used on line 1056 to send a reply. However, the http_request
pointer in ctx may have been set to nullptr by free_handler_ctx (line 441)
which could be called if the client disconnects. While the parameter is passed
by value, if free_handler_ctx is called concurrently or the underlying
HttpRequest object is destroyed, this will cause a crash.
##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -123,26 +161,75 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
}
}
ctx->write_data_cost_nanos = MonotonicNanos() -
ctx->start_write_data_nanos;
- ctx->promise.set_value(*status);
- if (!status->ok() && ctx->body_sink != nullptr) {
- // In some cases, the load execution is exited early.
- // For example, when max_filter_ratio is 0 and illegal data is
encountered
- // during stream loading, the entire load process is terminated
early.
- // However, the http connection may still be sending data to
stream_load_pipe
- // and waiting for it to be consumed.
- // Therefore, we need to actively cancel to end the pipe.
- ctx->body_sink->cancel(status->to_string());
+ // If event_base is set, use async callback; otherwise use sync mode
+ if (ctx->event_base != nullptr && ctx->http_request != nullptr &&
+ ctx->stream_load_action != nullptr) {
+ // Async mode: use event_base_once to trigger callback in libevent
thread
+ // Note: exec_fragment callback should only be called once (after
fragment execution)
+ // If it's called multiple times, there's a bug that should be
fixed
+
+ // Save state for callback (value copy to ensure thread safety)
+ Status fragment_status = *status;
+
+ // Save other state needed in callback
+ bool need_rollback = ctx->need_rollback;
+ bool need_commit_self = ctx->need_commit_self;
+ bool body_sink_cancelled = (ctx->body_sink != nullptr &&
ctx->body_sink->cancelled());
Review Comment:
Double access to body_sink: The code reads ctx->body_sink and checks if it's
cancelled on line 178, but this happens in the fragment manager thread. Later,
the callback may access body_sink in a different thread (libevent thread). If
body_sink is not thread-safe or can be modified/destroyed between these
accesses, this could lead to undefined behavior. The body_sink_cancelled flag
is captured but the sink itself may still be accessed in
continue_handle_after_future.
##########
be/src/http/action/stream_load.cpp:
##########
@@ -924,4 +979,110 @@ Status
StreamLoadAction::_handle_group_commit(HttpRequest* req,
return Status::OK();
}
+void
StreamLoadAction::continue_handle_after_future(std::shared_ptr<StreamLoadContext>
ctx,
+ Status fragment_status,
bool need_rollback,
+ bool need_commit_self,
+ bool body_sink_cancelled) {
+ // Handle body_sink cancel if needed
+ if (!fragment_status.ok() && ctx->body_sink != nullptr) {
+ ctx->body_sink->cancel(fragment_status.to_string());
+ }
+
+ // Handle need_commit_self case
+ if (need_commit_self && ctx->body_sink != nullptr) {
+ if (body_sink_cancelled || !fragment_status.ok()) {
+ ctx->status = fragment_status;
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ } else {
+ // Execute commit in callback (this was originally in
exec_fragment)
+
static_cast<void>(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ }
+ }
+
+ // Continue with original subsequent logic
+ if (!fragment_status.ok()) {
+ ctx->status = fragment_status;
+ if (need_rollback) {
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+ ctx->need_rollback = false;
+ }
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ }
+
+ if (ctx->group_commit) {
+ LOG(INFO) << "skip commit because this is group commit, pipe_id=" <<
ctx->id.to_string();
+ ctx->status = Status::OK();
+ _finalize_request(ctx->http_request, ctx);
+ return;
+ }
+
+ if (ctx->two_phase_commit) {
+ int64_t pre_commit_start_time = MonotonicNanos();
+ Status commit_st =
_exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
+ ctx->pre_commit_txn_cost_nanos = MonotonicNanos() -
pre_commit_start_time;
+ ctx->status = commit_st;
+ } else {
+ int64_t commit_and_publish_start_time = MonotonicNanos();
+ Status commit_st =
_exec_env->stream_load_executor()->commit_txn(ctx.get());
+ ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() -
commit_and_publish_start_time;
+ g_stream_load_commit_and_publish_latency_ms
+ << ctx->commit_and_publish_txn_cost_nanos / 1000000;
+ ctx->status = commit_st;
+ }
+
+ _finalize_request(ctx->http_request, ctx);
Review Comment:
Potential null pointer dereference: In continue_handle_after_future,
ctx->http_request is accessed on lines 996, 1001, 1013, 1020, and 1038, but it
can be set to nullptr in free_handler_ctx (line 441). Even though there's a
check on line 74 of stream_load_executor.cpp, there's no guarantee that
http_request won't become null between that check and when
continue_handle_after_future is called, creating a potential crash.
##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -123,26 +161,75 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
}
}
ctx->write_data_cost_nanos = MonotonicNanos() -
ctx->start_write_data_nanos;
- ctx->promise.set_value(*status);
- if (!status->ok() && ctx->body_sink != nullptr) {
- // In some cases, the load execution is exited early.
- // For example, when max_filter_ratio is 0 and illegal data is
encountered
- // during stream loading, the entire load process is terminated
early.
- // However, the http connection may still be sending data to
stream_load_pipe
- // and waiting for it to be consumed.
- // Therefore, we need to actively cancel to end the pipe.
- ctx->body_sink->cancel(status->to_string());
+ // If event_base is set, use async callback; otherwise use sync mode
+ if (ctx->event_base != nullptr && ctx->http_request != nullptr &&
+ ctx->stream_load_action != nullptr) {
+ // Async mode: use event_base_once to trigger callback in libevent
thread
+ // Note: exec_fragment callback should only be called once (after
fragment execution)
+ // If it's called multiple times, there's a bug that should be
fixed
+
+ // Save state for callback (value copy to ensure thread safety)
+ Status fragment_status = *status;
+
+ // Save other state needed in callback
+ bool need_rollback = ctx->need_rollback;
+ bool need_commit_self = ctx->need_commit_self;
+ bool body_sink_cancelled = (ctx->body_sink != nullptr &&
ctx->body_sink->cancelled());
+
+ // Allocate callback data on heap since event_base_once may not
execute immediately
+ auto* callback_data =
+ new StreamLoadAsyncCallbackData {.ctx = ctx,
+ .fragment_status =
fragment_status,
+ .need_rollback =
need_rollback,
+ .need_commit_self =
need_commit_self,
+ .body_sink_cancelled =
body_sink_cancelled};
+
+ // Use event_base_once to trigger callback in libevent thread
+ // This is a one-shot event, no timer needed
+ struct timeval tv {};
+ tv.tv_sec = 0;
+ tv.tv_usec = 0; // Trigger immediately
+ int ret = event_base_once(ctx->event_base, -1, EV_TIMEOUT,
stream_load_async_callback,
+ callback_data, &tv);
+ if (ret != 0) {
+ // event_base_once failed, log error
+ // Note: In async mode, no code is waiting for promise
(_handle() has returned),
+ // so no need to set promise
+ LOG(ERROR) << "event_base_once failed, cannot send async
callback, ctx="
+ << ctx->id.to_string() << ", errno=" << errno;
+ // Note: Cannot send HTTP response in this case, request may
be invalid
+ // No need to set promise, as no code is waiting in async mode
Review Comment:
Inconsistent error handling: When event_base_once fails (line 195), the
error is logged but no corrective action is taken. The request will be left in
an incomplete state - the fragment execution has finished but no response will
be sent to the client, and the transaction state is unclear. At minimum, this
should attempt to finalize the request or set an error status that will be
handled elsewhere.
```suggestion
// event_base_once failed, log error and fall back to
executing callback
// synchronously in the current thread to avoid leaving the
request incomplete.
LOG(ERROR) << "event_base_once failed, cannot send async
callback, ctx="
<< ctx->id.to_string() << ", errno=" << errno;
// Best-effort fallback: directly invoke the callback so
that the normal
// completion logic (including HTTP response / txn handling)
still runs.
// The callback will delete callback_data.
stream_load_async_callback(-1, EV_TIMEOUT, callback_data);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]