This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d2922aa4031 [bugfix](arrowflight) should call done run in on_xxx
method to make work in async mode (#60282)
d2922aa4031 is described below
commit d2922aa4031818bed041a271d1315846c08182d4
Author: yiguolei <[email protected]>
AuthorDate: Thu Jan 29 08:51:37 2026 +0800
[bugfix](arrowflight) should call done run in on_xxx method to make work in
async mode (#60282)
there is a logic similar to our FE fetching data from the BE: if the
data is not available, it will be added to a waiting queue. Once added
to the waiting queue, the guard will invoke the run method
automatically.
At this point, the data received by the client is actually undefined,
yet the status of this undefined value is unexpectedly ok. As a result,
the program attempts to read the block value, leading to a dirty read.
---
be/src/service/internal_service.cpp | 3 +--
be/src/vec/sink/varrow_flight_result_writer.cpp | 4 +++
be/src/vec/sink/varrow_flight_result_writer.h | 4 ++-
.../vec/sink/arrow_result_block_buffer_test.cpp | 29 +++++++++++++++++-----
be/test/vec/sink/get_result_batch_test.cpp | 6 +++--
5 files changed, 35 insertions(+), 11 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index f68716eebc8..f118e8b4122 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -671,8 +671,7 @@ void
PInternalService::fetch_arrow_data(google::protobuf::RpcController* control
PFetchArrowDataResult* result,
google::protobuf::Closure* done) {
bool ret = _arrow_flight_work_pool.try_offer([request, result, done]() {
- brpc::ClosureGuard closure_guard(done);
- auto ctx = vectorized::GetArrowResultBatchCtx::create_shared(result);
+ auto ctx = vectorized::GetArrowResultBatchCtx::create_shared(result,
done);
TUniqueId unique_id = UniqueId(request->finst_id()).to_thrift(); //
query_id or instance_id
std::shared_ptr<vectorized::ArrowFlightResultBlockBuffer> arrow_buffer;
auto st = ExecEnv::GetInstance()->result_mgr()->find_buffer(unique_id,
arrow_buffer);
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index b8280785546..6c9c393ae4a 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -33,6 +33,7 @@ namespace doris::vectorized {
void GetArrowResultBatchCtx::on_failure(const Status& status) {
DCHECK(!status.ok()) << "status is ok, errmsg=" << status;
status.to_protobuf(_result->mutable_status());
+ _done->Run();
}
void GetArrowResultBatchCtx::on_close(int64_t packet_seq, int64_t /*
returned_rows */) {
@@ -40,6 +41,7 @@ void GetArrowResultBatchCtx::on_close(int64_t packet_seq,
int64_t /* returned_ro
status.to_protobuf(_result->mutable_status());
_result->set_packet_seq(packet_seq);
_result->set_eos(true);
+ _done->Run();
}
Status GetArrowResultBatchCtx::on_data(const
std::shared_ptr<vectorized::Block>& block,
@@ -72,6 +74,8 @@ Status GetArrowResultBatchCtx::on_data(const
std::shared_ptr<vectorized::Block>&
_result->clear_block();
}
st.to_protobuf(_result->mutable_status());
+
+ _done->Run();
return Status::OK();
}
diff --git a/be/src/vec/sink/varrow_flight_result_writer.h
b/be/src/vec/sink/varrow_flight_result_writer.h
index f84c3a12fa8..d1f5e21404a 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.h
+++ b/be/src/vec/sink/varrow_flight_result_writer.h
@@ -35,7 +35,8 @@ class GetArrowResultBatchCtx {
public:
using ResultType = vectorized::Block;
ENABLE_FACTORY_CREATOR(GetArrowResultBatchCtx)
- GetArrowResultBatchCtx(PFetchArrowDataResult* result) : _result(result) {}
+ GetArrowResultBatchCtx(PFetchArrowDataResult* result,
google::protobuf::Closure* done)
+ : _result(result), _done(done) {}
#ifdef BE_TEST
GetArrowResultBatchCtx() = default;
#endif
@@ -53,6 +54,7 @@ private:
int32_t _max_msg_size = std::numeric_limits<int32_t>::max();
#endif
PFetchArrowDataResult* _result = nullptr;
+ google::protobuf::Closure* _done = nullptr;
};
class ArrowFlightResultBlockBuffer final : public
ResultBlockBuffer<GetArrowResultBatchCtx> {
diff --git a/be/test/vec/sink/arrow_result_block_buffer_test.cpp
b/be/test/vec/sink/arrow_result_block_buffer_test.cpp
index 48718361185..7fb1d8adf1f 100644
--- a/be/test/vec/sink/arrow_result_block_buffer_test.cpp
+++ b/be/test/vec/sink/arrow_result_block_buffer_test.cpp
@@ -34,6 +34,14 @@
namespace doris::vectorized {
+class MockClosure : public google::protobuf::Closure {
+public:
+ MockClosure() {}
+ MockClosure(std::function<void()> cb) : _cb(cb) {}
+ void Run() override { _cb(); }
+
+ std::function<void()> _cb;
+};
class ArrowResultBlockBufferTest : public ::testing::Test {
public:
ArrowResultBlockBufferTest() = default;
@@ -44,8 +52,9 @@ class MockGetArrowResultBatchCtx : public
GetArrowResultBatchCtx {
public:
ENABLE_FACTORY_CREATOR(MockGetArrowResultBatchCtx)
MockGetArrowResultBatchCtx(std::function<void()> fail_cb,
std::function<void()> close_cb,
- std::function<void()> data_cb,
PFetchArrowDataResult* result)
- : GetArrowResultBatchCtx(result),
+ std::function<void()> data_cb,
PFetchArrowDataResult* result,
+ google::protobuf::Closure* done)
+ : GetArrowResultBatchCtx(result, done),
_fail_cb(fail_cb),
_close_cb(close_cb),
_data_cb(data_cb) {}
@@ -78,9 +87,11 @@ TEST_F(ArrowResultBlockBufferTest,
TestArrowResultBlockBuffer) {
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema,
buffer_size);
buffer.set_dependency(ins_id, dep);
PFetchArrowDataResult presult;
+
+ MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
std::shared_ptr<GetArrowResultBatchCtx> ctx =
MockGetArrowResultBatchCtx::create_shared(
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
- [&]() -> void { data = true; }, &presult);
+ [&]() -> void { data = true; }, &presult, &done);
{
auto num_rows = 2;
@@ -201,9 +212,11 @@ TEST_F(ArrowResultBlockBufferTest,
TestCancelArrowResultBlockBuffer) {
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema,
buffer_size);
buffer.set_dependency(ins_id, dep);
PFetchArrowDataResult presult;
+
+ MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
std::shared_ptr<GetArrowResultBatchCtx> ctx =
MockGetArrowResultBatchCtx::create_shared(
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
- [&]() -> void { data = true; }, &presult);
+ [&]() -> void { data = true; }, &presult, &done);
{
EXPECT_TRUE(buffer.get_batch(ctx).ok());
@@ -273,9 +286,11 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema,
buffer_size);
buffer.set_dependency(ins_id, dep);
PFetchArrowDataResult presult;
+
+ MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
std::shared_ptr<GetArrowResultBatchCtx> ctx =
MockGetArrowResultBatchCtx::create_shared(
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
- [&]() -> void { data = true; }, &presult);
+ [&]() -> void { data = true; }, &presult, &done);
{
EXPECT_TRUE(buffer.get_batch(ctx).ok());
@@ -330,9 +345,11 @@ TEST_F(ArrowResultBlockBufferTest,
TestArrowResultSerializeFailure) {
ArrowFlightResultBlockBuffer buffer(TUniqueId(), &state, schema,
buffer_size);
buffer.set_dependency(ins_id, dep);
PFetchArrowDataResult presult;
+
+ MockClosure done([&]() -> void { std::cout << "cb" << std::endl; });
std::shared_ptr<GetArrowResultBatchCtx> ctx =
MockGetArrowResultBatchCtx::create_shared(
[&]() -> void { fail = true; }, [&]() -> void { close = true; },
- [&]() -> void { data = true; }, &presult);
+ [&]() -> void { data = true; }, &presult, &done);
{
auto num_rows = 2;
diff --git a/be/test/vec/sink/get_result_batch_test.cpp
b/be/test/vec/sink/get_result_batch_test.cpp
index 82eff9dd289..b820bd43ecf 100644
--- a/be/test/vec/sink/get_result_batch_test.cpp
+++ b/be/test/vec/sink/get_result_batch_test.cpp
@@ -40,10 +40,10 @@ public:
class MockClosure : public google::protobuf::Closure {
public:
+ MockClosure() {}
MockClosure(std::function<void()> cb) : _cb(cb) {}
void Run() override { _cb(); }
-private:
std::function<void()> _cb;
};
@@ -126,7 +126,9 @@ TEST_F(GetResultBatchCtxTest, TestGetResultBatchCtx) {
TEST_F(GetResultBatchCtxTest, TestGetArrowResultBatchCtx) {
PFetchArrowDataResult result;
- auto ctx = GetArrowResultBatchCtx::create_shared(&result);
+ MockClosure closure;
+ closure._cb = [&]() { std::cout << "cb" << std::endl; };
+ auto ctx = GetArrowResultBatchCtx::create_shared(&result, &closure);
{
// on_failure
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]