Copilot commented on code in PR #59428:
URL: https://github.com/apache/doris/pull/59428#discussion_r2649600494


##########
be/src/http/action/stream_load.cpp:
##########
@@ -398,6 +436,23 @@ void 
StreamLoadAction::free_handler_ctx(std::shared_ptr<void> param) {
     if (ctx == nullptr) {
         return;
     }
+
+    // Mark request as closed to prevent callback from accessing invalid 
pointers
+    ctx->http_request = nullptr;
+    ctx->stream_load_action = nullptr;
+    ctx->event_base = nullptr;
+
+    // If promise hasn't been set, set it to avoid other code waiting forever
+    try {
+        auto future_status = 
ctx->future.wait_for(std::chrono::milliseconds(0));
+        if (future_status == std::future_status::timeout) {
+            // promise hasn't been set, set a cancelled status
+            ctx->promise.set_value(Status::Cancelled("Request closed by 
client"));
+        }
+    } catch (...) {
+        // promise already set, ignore

Review Comment:
   Exception handling issue: The try-catch block catches all exceptions with 
catch(...) but doesn't log what exception was caught or why. This makes 
debugging difficult if an unexpected exception occurs. At minimum, this should 
log that an exception was caught, or better yet, only catch the specific 
exception type expected from set_value when the promise is already satisfied.
   ```suggestion
       } catch (const std::future_error& e) {
           // Promise may already be satisfied or in an invalid state; ignore 
this benign race.
           VLOG_ROW << "Ignoring std::future_error while setting cancel status 
in free_handler_ctx: "
                    << e.what();
       } catch (const std::exception& e) {
           // Log unexpected standard exceptions to aid debugging but do not 
let them escape.
           LOG(WARNING) << "Unexpected exception while setting cancel status in 
free_handler_ctx: "
                        << e.what();
       } catch (...) {
           // Log unknown non-standard exceptions.
           LOG(WARNING) << "Unknown non-standard exception while setting cancel 
status in "
                           "free_handler_ctx.";
   ```



##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -123,26 +161,75 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
             }
         }
         ctx->write_data_cost_nanos = MonotonicNanos() - 
ctx->start_write_data_nanos;
-        ctx->promise.set_value(*status);
 
-        if (!status->ok() && ctx->body_sink != nullptr) {
-            // In some cases, the load execution is exited early.
-            // For example, when max_filter_ratio is 0 and illegal data is 
encountered
-            // during stream loading, the entire load process is terminated 
early.
-            // However, the http connection may still be sending data to 
stream_load_pipe
-            // and waiting for it to be consumed.
-            // Therefore, we need to actively cancel to end the pipe.
-            ctx->body_sink->cancel(status->to_string());
+        // If event_base is set, use async callback; otherwise use sync mode
+        if (ctx->event_base != nullptr && ctx->http_request != nullptr &&
+            ctx->stream_load_action != nullptr) {
+            // Async mode: use event_base_once to trigger callback in libevent 
thread
+            // Note: exec_fragment callback should only be called once (after 
fragment execution)
+            // If it's called multiple times, there's a bug that should be 
fixed
+
+            // Save state for callback (value copy to ensure thread safety)
+            Status fragment_status = *status;
+
+            // Save other state needed in callback
+            bool need_rollback = ctx->need_rollback;
+            bool need_commit_self = ctx->need_commit_self;
+            bool body_sink_cancelled = (ctx->body_sink != nullptr && 
ctx->body_sink->cancelled());
+
+            // Allocate callback data on heap since event_base_once may not 
execute immediately
+            auto* callback_data =
+                    new StreamLoadAsyncCallbackData {.ctx = ctx,
+                                                     .fragment_status = 
fragment_status,
+                                                     .need_rollback = 
need_rollback,
+                                                     .need_commit_self = 
need_commit_self,
+                                                     .body_sink_cancelled = 
body_sink_cancelled};
+
+            // Use event_base_once to trigger callback in libevent thread
+            // This is a one-shot event, no timer needed
+            struct timeval tv {};
+            tv.tv_sec = 0;
+            tv.tv_usec = 0; // Trigger immediately
+            int ret = event_base_once(ctx->event_base, -1, EV_TIMEOUT, 
stream_load_async_callback,
+                                      callback_data, &tv);
+            if (ret != 0) {
+                // event_base_once failed, log error
+                // Note: In async mode, no code is waiting for promise 
(_handle() has returned),
+                // so no need to set promise
+                LOG(ERROR) << "event_base_once failed, cannot send async 
callback, ctx="
+                           << ctx->id.to_string() << ", errno=" << errno;

Review Comment:
   Memory leak: If event_base_once fails (line 195), the callback_data 
allocated on line 181-186 is never deleted, causing a memory leak. The 
callback_data should be deleted in the error path.
   ```suggestion
                              << ctx->id.to_string() << ", errno=" << errno;
                   // Clean up callback_data since callback will never be 
invoked on failure
                   delete callback_data;
   ```



##########
be/src/http/action/stream_load.cpp:
##########
@@ -924,4 +979,110 @@ Status 
StreamLoadAction::_handle_group_commit(HttpRequest* req,
     return Status::OK();
 }
 
+void 
StreamLoadAction::continue_handle_after_future(std::shared_ptr<StreamLoadContext>
 ctx,
+                                                    Status fragment_status, 
bool need_rollback,
+                                                    bool need_commit_self,
+                                                    bool body_sink_cancelled) {
+    // Handle body_sink cancel if needed
+    if (!fragment_status.ok() && ctx->body_sink != nullptr) {
+        ctx->body_sink->cancel(fragment_status.to_string());
+    }
+
+    // Handle need_commit_self case
+    if (need_commit_self && ctx->body_sink != nullptr) {
+        if (body_sink_cancelled || !fragment_status.ok()) {
+            ctx->status = fragment_status;
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+            _finalize_request(ctx->http_request, ctx);
+            return;
+        } else {
+            // Execute commit in callback (this was originally in 
exec_fragment)
+            
static_cast<void>(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+            _finalize_request(ctx->http_request, ctx);
+            return;
+        }
+    }
+
+    // Continue with original subsequent logic
+    if (!fragment_status.ok()) {
+        ctx->status = fragment_status;
+        if (need_rollback) {
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+            ctx->need_rollback = false;
+        }
+        _finalize_request(ctx->http_request, ctx);
+        return;
+    }
+
+    if (ctx->group_commit) {
+        LOG(INFO) << "skip commit because this is group commit, pipe_id=" << 
ctx->id.to_string();
+        ctx->status = Status::OK();
+        _finalize_request(ctx->http_request, ctx);
+        return;
+    }
+
+    if (ctx->two_phase_commit) {
+        int64_t pre_commit_start_time = MonotonicNanos();
+        Status commit_st = 
_exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
+        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
+        ctx->status = commit_st;
+    } else {
+        int64_t commit_and_publish_start_time = MonotonicNanos();
+        Status commit_st = 
_exec_env->stream_load_executor()->commit_txn(ctx.get());
+        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
+        g_stream_load_commit_and_publish_latency_ms
+                << ctx->commit_and_publish_txn_cost_nanos / 1000000;
+        ctx->status = commit_st;
+    }
+
+    _finalize_request(ctx->http_request, ctx);
+}
+
+void StreamLoadAction::_finalize_request(HttpRequest* req, 
std::shared_ptr<StreamLoadContext> ctx) {
+    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
+
+    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+        if (ctx->need_rollback) {
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+            ctx->need_rollback = false;
+        }

Review Comment:
   Duplicated rollback logic: The rollback logic appears in multiple places - 
in continue_handle_after_future (lines 995, 1010) and in _finalize_request 
(lines 1046). This duplication could lead to maintenance issues if the rollback 
behavior needs to change. Additionally, there's a risk of double rollback if 
both code paths execute, though the need_rollback flag should prevent this in 
most cases.
   ```suggestion
           auto rollback_if_needed = [this](const 
std::shared_ptr<StreamLoadContext>& c) {
               if (c->need_rollback) {
                   _exec_env->stream_load_executor()->rollback_txn(c.get());
                   c->need_rollback = false;
               }
           };
           rollback_if_needed(ctx);
   ```



##########
be/src/http/action/stream_load.cpp:
##########
@@ -160,7 +170,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
     }
 }
 
-Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
+Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, 
HttpRequest* req) {
     if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
         LOG(WARNING) << "recevie body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes

Review Comment:
   Spelling error: "recevie" should be spelled "receive".
   ```suggestion
           LOG(WARNING) << "receive body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes
   ```



##########
be/src/runtime/stream_load/stream_load_context.h:
##########
@@ -256,6 +261,13 @@ class StreamLoadContext {
     std::string qualified_user;
     std::string cloud_cluster;
 
+    // Fields for async processing (Scheme B: libevent deferred callback)
+    // These fields are set in _on_header() before execute_plan_fragment is 
called
+    // to avoid race conditions
+    struct event_base* event_base = nullptr;        // libevent event loop
+    HttpRequest* http_request = nullptr;            // HTTP request reference
+    StreamLoadAction* stream_load_action = nullptr; // StreamLoadAction 
instance pointer

Review Comment:
   Raw pointer members without ownership semantics: The StreamLoadContext class 
now contains raw pointers (event_base, http_request, stream_load_action) that 
don't express ownership. These pointers are set to nullptr in free_handler_ctx 
but there's no clear lifetime management. If the HttpRequest or 
StreamLoadAction is destroyed while the async callback is pending or executing, 
accessing these pointers will cause undefined behavior. Consider using weak_ptr 
or another mechanism to safely check if these objects are still valid.



##########
be/src/http/action/stream_load.cpp:
##########
@@ -176,26 +186,38 @@ Status 
StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
         
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx, 
mocked));
     }
 
-    // wait stream load finish
-    RETURN_IF_ERROR(ctx->future.get());
+    // Check if event_base is set (should be set in _on_header)
+    // If not set, fallback to sync wait
+    if (ctx->event_base == nullptr || ctx->http_request == nullptr ||
+        ctx->stream_load_action == nullptr) {
+        // event_base not set (should not happen), fallback to sync wait
+        LOG(WARNING) << "event_base not set, fallback to sync wait, ctx=" << 
ctx->id.to_string();
+        RETURN_IF_ERROR(ctx->future.get());
 
-    if (ctx->group_commit) {
-        LOG(INFO) << "skip commit because this is group commit, pipe_id=" << 
ctx->id.to_string();
+        // Continue with original logic
+        if (ctx->group_commit) {
+            LOG(INFO) << "skip commit because this is group commit, pipe_id="
+                      << ctx->id.to_string();
+            return Status::OK();
+        }
+
+        if (ctx->two_phase_commit) {
+            int64_t pre_commit_start_time = MonotonicNanos();
+            
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
+            ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
+        } else {
+            int64_t commit_and_publish_start_time = MonotonicNanos();
+            
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+            ctx->commit_and_publish_txn_cost_nanos =
+                    MonotonicNanos() - commit_and_publish_start_time;
+            g_stream_load_commit_and_publish_latency_ms
+                    << ctx->commit_and_publish_txn_cost_nanos / 1000000;
+        }
         return Status::OK();
     }
 
-    if (ctx->two_phase_commit) {
-        int64_t pre_commit_start_time = MonotonicNanos();
-        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
-        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
-    } else {
-        // If put file success we need commit this load
-        int64_t commit_and_publish_start_time = MonotonicNanos();
-        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
-        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
-        g_stream_load_commit_and_publish_latency_ms
-                << ctx->commit_and_publish_txn_cost_nanos / 1000000;
-    }
+    // event_base is set, use async mode, return directly
+    // Future completion will trigger callback in stream_load_executor

Review Comment:
   Incorrect comment: The comment states "Future completion will trigger 
callback in stream_load_executor" but this is misleading. The callback is not 
triggered by future completion - it's triggered immediately by event_base_once 
with a zero timeout. The future/promise mechanism is actually bypassed in async 
mode, as noted in the code itself (lines 197-198, 202, 204).
   ```suggestion
       // stream_load_executor will be invoked via event_base_once 
(zero-timeout) callback
   ```



##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -123,26 +161,75 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
             }
         }
         ctx->write_data_cost_nanos = MonotonicNanos() - 
ctx->start_write_data_nanos;
-        ctx->promise.set_value(*status);
 
-        if (!status->ok() && ctx->body_sink != nullptr) {
-            // In some cases, the load execution is exited early.
-            // For example, when max_filter_ratio is 0 and illegal data is 
encountered
-            // during stream loading, the entire load process is terminated 
early.
-            // However, the http connection may still be sending data to 
stream_load_pipe
-            // and waiting for it to be consumed.
-            // Therefore, we need to actively cancel to end the pipe.
-            ctx->body_sink->cancel(status->to_string());
+        // If event_base is set, use async callback; otherwise use sync mode
+        if (ctx->event_base != nullptr && ctx->http_request != nullptr &&
+            ctx->stream_load_action != nullptr) {
+            // Async mode: use event_base_once to trigger callback in libevent 
thread
+            // Note: exec_fragment callback should only be called once (after 
fragment execution)
+            // If it's called multiple times, there's a bug that should be 
fixed
+
+            // Save state for callback (value copy to ensure thread safety)
+            Status fragment_status = *status;
+
+            // Save other state needed in callback
+            bool need_rollback = ctx->need_rollback;
+            bool need_commit_self = ctx->need_commit_self;
+            bool body_sink_cancelled = (ctx->body_sink != nullptr && 
ctx->body_sink->cancelled());
+
+            // Allocate callback data on heap since event_base_once may not 
execute immediately
+            auto* callback_data =
+                    new StreamLoadAsyncCallbackData {.ctx = ctx,
+                                                     .fragment_status = 
fragment_status,
+                                                     .need_rollback = 
need_rollback,
+                                                     .need_commit_self = 
need_commit_self,
+                                                     .body_sink_cancelled = 
body_sink_cancelled};

Review Comment:
   Unsafe shared_ptr usage across threads: The callback_data contains a 
shared_ptr to StreamLoadContext (ctx). This shared_ptr is created in one thread 
(fragment manager thread) and the callback_data is then passed to potentially 
execute in another thread (libevent thread). While shared_ptr itself is 
thread-safe for reference counting, the callback deletes the callback_data 
(line 92) which will decrement the ref count. If this happens concurrently with 
other operations on the shared_ptr in other threads without proper 
synchronization, it could lead to issues. More critically, the ctx object 
members (like http_request, stream_load_action) are being accessed and modified 
across threads without synchronization.



##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -54,6 +57,41 @@
 namespace doris {
 using namespace ErrorCode;
 
+// Callback data structure for async stream load processing
+struct StreamLoadAsyncCallbackData {
+    std::shared_ptr<StreamLoadContext> ctx;
+    Status fragment_status;
+    bool need_rollback;
+    bool need_commit_self;
+    bool body_sink_cancelled;
+};
+
+// C-style callback function for event_base_once
+static void stream_load_async_callback(evutil_socket_t, short, void* arg) {
+    auto* data = static_cast<StreamLoadAsyncCallbackData*>(arg);
+    // This callback executes in libevent thread
+    // Check validity again (request may have been closed during wait)
+    if (data->ctx->stream_load_action == nullptr || data->ctx->http_request == 
nullptr) {
+        LOG(WARNING) << "stream_load_action or http_request is null in 
callback, "
+                     << "request may be closed, ctx=" << 
data->ctx->id.to_string();
+        // Note: No need to set promise here because:
+        // 1. If request is closed, free_handler_ctx should have set promise 
(to Cancelled)
+        // 2. In async mode, _handle() has returned, won't wait for promise
+        // 3. If promise is already set, setting again will throw exception
+        // 4. Even if promise is not set, setting here is meaningless since 
request is closed
+        delete data;
+        return;
+    }
+
+    // Continue with subsequent logic
+    data->ctx->stream_load_action->continue_handle_after_future(
+            data->ctx, data->fragment_status, data->need_rollback, 
data->need_commit_self,
+            data->body_sink_cancelled);

Review Comment:
   Race condition: The validity check of ctx->stream_load_action and 
ctx->http_request happens in the callback (libevent thread), but these fields 
can be set to nullptr in free_handler_ctx (lines 441-443 in stream_load.cpp) 
which may run concurrently. Without synchronization, there's a 
time-of-check-time-of-use (TOCTOU) race where the pointers could become null 
between the check on line 74 and their use on lines 87-89.



##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -54,6 +57,41 @@
 namespace doris {
 using namespace ErrorCode;
 
+// Callback data structure for async stream load processing
+struct StreamLoadAsyncCallbackData {
+    std::shared_ptr<StreamLoadContext> ctx;
+    Status fragment_status;
+    bool need_rollback;
+    bool need_commit_self;
+    bool body_sink_cancelled;
+};
+
+// C-style callback function for event_base_once
+static void stream_load_async_callback(evutil_socket_t, short, void* arg) {
+    auto* data = static_cast<StreamLoadAsyncCallbackData*>(arg);
+    // This callback executes in libevent thread
+    // Check validity again (request may have been closed during wait)
+    if (data->ctx->stream_load_action == nullptr || data->ctx->http_request == 
nullptr) {
+        LOG(WARNING) << "stream_load_action or http_request is null in 
callback, "
+                     << "request may be closed, ctx=" << 
data->ctx->id.to_string();
+        // Note: No need to set promise here because:
+        // 1. If request is closed, free_handler_ctx should have set promise 
(to Cancelled)
+        // 2. In async mode, _handle() has returned, won't wait for promise
+        // 3. If promise is already set, setting again will throw exception
+        // 4. Even if promise is not set, setting here is meaningless since 
request is closed

Review Comment:
   Unclear control flow comment: The comment states "No need to set promise 
here because..." but then lists several conflicting reasons. Specifically, 
point 2 says "_handle() has returned, won't wait for promise" but point 4 says 
"Even if promise is not set, setting here is meaningless since request is 
closed". This creates confusion about the actual contract - is the promise 
required to be set or not? The comment should clearly state the expected 
behavior.
   ```suggestion
           // Note: Do not touch the associated promise here:
           // 1. If the request has already been closed, free_handler_ctx is 
responsible for
           //    fulfilling the promise (typically with a Cancelled status).
           // 2. In async mode, _handle() has already returned and no caller is 
waiting on this
           //    callback to satisfy the promise.
           // 3. The promise may already have been set; attempting to set it 
again would be unsafe
           //    and could throw an exception.
   ```



##########
be/src/http/action/stream_load.cpp:
##########
@@ -924,4 +979,110 @@ Status 
StreamLoadAction::_handle_group_commit(HttpRequest* req,
     return Status::OK();
 }
 
+void 
StreamLoadAction::continue_handle_after_future(std::shared_ptr<StreamLoadContext>
 ctx,
+                                                    Status fragment_status, 
bool need_rollback,
+                                                    bool need_commit_self,
+                                                    bool body_sink_cancelled) {
+    // Handle body_sink cancel if needed
+    if (!fragment_status.ok() && ctx->body_sink != nullptr) {
+        ctx->body_sink->cancel(fragment_status.to_string());
+    }
+
+    // Handle need_commit_self case
+    if (need_commit_self && ctx->body_sink != nullptr) {
+        if (body_sink_cancelled || !fragment_status.ok()) {
+            ctx->status = fragment_status;
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+            _finalize_request(ctx->http_request, ctx);
+            return;
+        } else {
+            // Execute commit in callback (this was originally in 
exec_fragment)
+            
static_cast<void>(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+            _finalize_request(ctx->http_request, ctx);
+            return;
+        }
+    }
+
+    // Continue with original subsequent logic
+    if (!fragment_status.ok()) {
+        ctx->status = fragment_status;
+        if (need_rollback) {
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+            ctx->need_rollback = false;
+        }
+        _finalize_request(ctx->http_request, ctx);
+        return;
+    }
+
+    if (ctx->group_commit) {
+        LOG(INFO) << "skip commit because this is group commit, pipe_id=" << 
ctx->id.to_string();
+        ctx->status = Status::OK();
+        _finalize_request(ctx->http_request, ctx);
+        return;
+    }
+
+    if (ctx->two_phase_commit) {
+        int64_t pre_commit_start_time = MonotonicNanos();
+        Status commit_st = 
_exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
+        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
+        ctx->status = commit_st;
+    } else {
+        int64_t commit_and_publish_start_time = MonotonicNanos();
+        Status commit_st = 
_exec_env->stream_load_executor()->commit_txn(ctx.get());
+        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
+        g_stream_load_commit_and_publish_latency_ms
+                << ctx->commit_and_publish_txn_cost_nanos / 1000000;
+        ctx->status = commit_st;
+    }
+
+    _finalize_request(ctx->http_request, ctx);
+}
+
+void StreamLoadAction::_finalize_request(HttpRequest* req, 
std::shared_ptr<StreamLoadContext> ctx) {
+    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
+
+    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+        if (ctx->need_rollback) {
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+            ctx->need_rollback = false;
+        }
+        if (ctx->body_sink != nullptr) {
+            ctx->body_sink->cancel(ctx->status.to_string());
+        }
+    }
+
+    auto str = ctx->to_json();
+    str = str + '\n';
+    HttpChannel::send_reply(req, str);

Review Comment:
   Potential use-after-free: In _finalize_request, ctx->http_request is passed 
as a parameter and used on line 1056 to send a reply. However, the http_request 
pointer in ctx may have been set to nullptr by free_handler_ctx (line 441) 
which could be called if the client disconnects. While the parameter is passed 
by value, if free_handler_ctx is called concurrently or the underlying 
HttpRequest object is destroyed, this will cause a crash.



##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -123,26 +161,75 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
             }
         }
         ctx->write_data_cost_nanos = MonotonicNanos() - 
ctx->start_write_data_nanos;
-        ctx->promise.set_value(*status);
 
-        if (!status->ok() && ctx->body_sink != nullptr) {
-            // In some cases, the load execution is exited early.
-            // For example, when max_filter_ratio is 0 and illegal data is 
encountered
-            // during stream loading, the entire load process is terminated 
early.
-            // However, the http connection may still be sending data to 
stream_load_pipe
-            // and waiting for it to be consumed.
-            // Therefore, we need to actively cancel to end the pipe.
-            ctx->body_sink->cancel(status->to_string());
+        // If event_base is set, use async callback; otherwise use sync mode
+        if (ctx->event_base != nullptr && ctx->http_request != nullptr &&
+            ctx->stream_load_action != nullptr) {
+            // Async mode: use event_base_once to trigger callback in libevent 
thread
+            // Note: exec_fragment callback should only be called once (after 
fragment execution)
+            // If it's called multiple times, there's a bug that should be 
fixed
+
+            // Save state for callback (value copy to ensure thread safety)
+            Status fragment_status = *status;
+
+            // Save other state needed in callback
+            bool need_rollback = ctx->need_rollback;
+            bool need_commit_self = ctx->need_commit_self;
+            bool body_sink_cancelled = (ctx->body_sink != nullptr && 
ctx->body_sink->cancelled());

Review Comment:
   Double access to body_sink: The code reads ctx->body_sink and checks if it's 
cancelled on line 178, but this happens in the fragment manager thread. Later, 
the callback may access body_sink in a different thread (libevent thread). If 
body_sink is not thread-safe or can be modified/destroyed between these 
accesses, this could lead to undefined behavior. The body_sink_cancelled flag 
is captured but the sink itself may still be accessed in 
continue_handle_after_future.



##########
be/src/http/action/stream_load.cpp:
##########
@@ -924,4 +979,110 @@ Status 
StreamLoadAction::_handle_group_commit(HttpRequest* req,
     return Status::OK();
 }
 
+void 
StreamLoadAction::continue_handle_after_future(std::shared_ptr<StreamLoadContext>
 ctx,
+                                                    Status fragment_status, 
bool need_rollback,
+                                                    bool need_commit_self,
+                                                    bool body_sink_cancelled) {
+    // Handle body_sink cancel if needed
+    if (!fragment_status.ok() && ctx->body_sink != nullptr) {
+        ctx->body_sink->cancel(fragment_status.to_string());
+    }
+
+    // Handle need_commit_self case
+    if (need_commit_self && ctx->body_sink != nullptr) {
+        if (body_sink_cancelled || !fragment_status.ok()) {
+            ctx->status = fragment_status;
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+            _finalize_request(ctx->http_request, ctx);
+            return;
+        } else {
+            // Execute commit in callback (this was originally in 
exec_fragment)
+            
static_cast<void>(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
+            _finalize_request(ctx->http_request, ctx);
+            return;
+        }
+    }
+
+    // Continue with original subsequent logic
+    if (!fragment_status.ok()) {
+        ctx->status = fragment_status;
+        if (need_rollback) {
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+            ctx->need_rollback = false;
+        }
+        _finalize_request(ctx->http_request, ctx);
+        return;
+    }
+
+    if (ctx->group_commit) {
+        LOG(INFO) << "skip commit because this is group commit, pipe_id=" << 
ctx->id.to_string();
+        ctx->status = Status::OK();
+        _finalize_request(ctx->http_request, ctx);
+        return;
+    }
+
+    if (ctx->two_phase_commit) {
+        int64_t pre_commit_start_time = MonotonicNanos();
+        Status commit_st = 
_exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
+        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
+        ctx->status = commit_st;
+    } else {
+        int64_t commit_and_publish_start_time = MonotonicNanos();
+        Status commit_st = 
_exec_env->stream_load_executor()->commit_txn(ctx.get());
+        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
+        g_stream_load_commit_and_publish_latency_ms
+                << ctx->commit_and_publish_txn_cost_nanos / 1000000;
+        ctx->status = commit_st;
+    }
+
+    _finalize_request(ctx->http_request, ctx);

Review Comment:
   Potential null pointer dereference: In continue_handle_after_future, 
ctx->http_request is accessed on lines 996, 1001, 1013, 1020, and 1038, but it 
can be set to nullptr in free_handler_ctx (line 441). Even though there's a 
check on line 74 of stream_load_executor.cpp, there's no guarantee that 
http_request won't become null between that check and when 
continue_handle_after_future is called, creating a potential crash.



##########
be/src/runtime/stream_load/stream_load_executor.cpp:
##########
@@ -123,26 +161,75 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
             }
         }
         ctx->write_data_cost_nanos = MonotonicNanos() - 
ctx->start_write_data_nanos;
-        ctx->promise.set_value(*status);
 
-        if (!status->ok() && ctx->body_sink != nullptr) {
-            // In some cases, the load execution is exited early.
-            // For example, when max_filter_ratio is 0 and illegal data is 
encountered
-            // during stream loading, the entire load process is terminated 
early.
-            // However, the http connection may still be sending data to 
stream_load_pipe
-            // and waiting for it to be consumed.
-            // Therefore, we need to actively cancel to end the pipe.
-            ctx->body_sink->cancel(status->to_string());
+        // If event_base is set, use async callback; otherwise use sync mode
+        if (ctx->event_base != nullptr && ctx->http_request != nullptr &&
+            ctx->stream_load_action != nullptr) {
+            // Async mode: use event_base_once to trigger callback in libevent 
thread
+            // Note: exec_fragment callback should only be called once (after 
fragment execution)
+            // If it's called multiple times, there's a bug that should be 
fixed
+
+            // Save state for callback (value copy to ensure thread safety)
+            Status fragment_status = *status;
+
+            // Save other state needed in callback
+            bool need_rollback = ctx->need_rollback;
+            bool need_commit_self = ctx->need_commit_self;
+            bool body_sink_cancelled = (ctx->body_sink != nullptr && 
ctx->body_sink->cancelled());
+
+            // Allocate callback data on heap since event_base_once may not 
execute immediately
+            auto* callback_data =
+                    new StreamLoadAsyncCallbackData {.ctx = ctx,
+                                                     .fragment_status = 
fragment_status,
+                                                     .need_rollback = 
need_rollback,
+                                                     .need_commit_self = 
need_commit_self,
+                                                     .body_sink_cancelled = 
body_sink_cancelled};
+
+            // Use event_base_once to trigger callback in libevent thread
+            // This is a one-shot event, no timer needed
+            struct timeval tv {};
+            tv.tv_sec = 0;
+            tv.tv_usec = 0; // Trigger immediately
+            int ret = event_base_once(ctx->event_base, -1, EV_TIMEOUT, 
stream_load_async_callback,
+                                      callback_data, &tv);
+            if (ret != 0) {
+                // event_base_once failed, log error
+                // Note: In async mode, no code is waiting for promise 
(_handle() has returned),
+                // so no need to set promise
+                LOG(ERROR) << "event_base_once failed, cannot send async 
callback, ctx="
+                           << ctx->id.to_string() << ", errno=" << errno;
+                // Note: Cannot send HTTP response in this case, request may 
be invalid
+                // No need to set promise, as no code is waiting in async mode

Review Comment:
   Inconsistent error handling: When event_base_once fails (line 195), the 
error is logged but no corrective action is taken. The request will be left in 
an incomplete state - the fragment execution has finished but no response will 
be sent to the client, and the transaction state is unclear. At minimum, this 
should attempt to finalize the request or set an error status that will be 
handled elsewhere.
   ```suggestion
                   // event_base_once failed, log error and fall back to 
executing callback
                   // synchronously in the current thread to avoid leaving the 
request incomplete.
                   LOG(ERROR) << "event_base_once failed, cannot send async 
callback, ctx="
                              << ctx->id.to_string() << ", errno=" << errno;
                   // Best-effort fallback: directly invoke the callback so 
that the normal
                   // completion logic (including HTTP response / txn handling) 
still runs.
                   // The callback will delete callback_data.
                   stream_load_async_callback(-1, EV_TIMEOUT, callback_data);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to