This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new ea61206233e [pick](branch-2.1) pick #43281 (#44020)
ea61206233e is described below
commit ea61206233eda8a26c6931543e6914cefa852fb3
Author: Xinyi Zou <[email protected]>
AuthorDate: Sat Nov 16 21:53:21 2024 +0800
[pick](branch-2.1) pick #43281 (#44020)
pick #43281
---
be/src/common/config.cpp | 7 +
be/src/common/config.h | 6 +
be/src/pipeline/exec/result_file_sink_operator.cpp | 2 +-
be/src/pipeline/exec/result_sink_operator.cpp | 12 +-
be/src/runtime/buffer_control_block.cpp | 236 ++++++++++++++---
be/src/runtime/buffer_control_block.h | 79 +++++-
be/src/runtime/result_buffer_mgr.cpp | 78 +++---
be/src/runtime/result_buffer_mgr.h | 29 +-
.../arrow_flight/arrow_flight_batch_reader.cpp | 291 +++++++++++++++++++--
.../arrow_flight/arrow_flight_batch_reader.h | 69 ++++-
be/src/service/arrow_flight/flight_sql_service.cpp | 57 ++--
be/src/service/internal_service.cpp | 60 ++++-
be/src/service/internal_service.h | 5 +
be/src/util/arrow/row_batch.cpp | 12 +-
be/src/util/arrow/row_batch.h | 12 +-
be/src/util/arrow/utils.cpp | 3 +-
be/src/util/doris_metrics.h | 5 +
be/src/vec/sink/varrow_flight_result_writer.cpp | 64 ++---
be/src/vec/sink/varrow_flight_result_writer.h | 18 +-
be/src/vec/sink/vmemory_scratch_sink.cpp | 2 +-
be/src/vec/sink/vresult_file_sink.cpp | 2 +-
be/src/vec/sink/vresult_sink.cpp | 11 +-
be/test/runtime/result_buffer_mgr_test.cpp | 13 +-
.../arrowflight/DorisFlightSqlProducer.java | 26 +-
.../arrowflight/FlightSqlConnectProcessor.java | 16 +-
gensrc/proto/internal_service.proto | 16 ++
26 files changed, 881 insertions(+), 250 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c239dc3e72d..d5b67c2c128 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -64,6 +64,7 @@ DEFINE_Int32(brpc_port, "8060");
DEFINE_Int32(arrow_flight_sql_port, "-1");
DEFINE_mString(public_access_ip, "");
+DEFINE_Int32(public_access_port, "-1");
// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
@@ -524,6 +525,8 @@ DEFINE_Int32(brpc_heavy_work_pool_threads, "-1");
DEFINE_Int32(brpc_light_work_pool_threads, "-1");
DEFINE_Int32(brpc_heavy_work_pool_max_queue_size, "-1");
DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
+DEFINE_Int32(brpc_arrow_flight_work_pool_threads, "-1");
+DEFINE_Int32(brpc_arrow_flight_work_pool_max_queue_size, "-1");
//Enable brpc builtin services, see:
//https://brpc.apache.org/docs/server/basics/#disable-built-in-services-completely
@@ -646,7 +649,11 @@ DEFINE_Int32(load_process_safe_mem_permit_percent, "5");
// result buffer cancelled time (unit: second)
DEFINE_mInt32(result_buffer_cancelled_interval_time, "300");
+// arrow flight result sink buffer rows size, default 4096 * 8
DEFINE_mInt32(arrow_flight_result_sink_buffer_size_rows, "32768");
+// The timeout for ADBC Client to wait for data using arrow flight reader.
+// If the query is very complex and no result is generated after this time,
consider increasing this timeout.
+DEFINE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms, "300000");
// the increased frequency of priority for remaining tasks in
BlockingPriorityQueue
DEFINE_mInt32(priority_queue_remaining_tasks_increased_frequency, "512");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 289c56464f3..aca5b6b829a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -104,6 +104,7 @@ DECLARE_Int32(arrow_flight_sql_port);
// For ADBC client fetch result, default is empty, the ADBC client uses the
backend ip to fetch the result.
// If ADBC client cannot access the backend ip, can set public_access_ip to
modify the fetch result ip.
DECLARE_mString(public_access_ip);
+DECLARE_Int32(public_access_port);
// the number of bthreads for brpc, the default value is set to -1,
// which means the number of bthreads is #cpu-cores
@@ -580,6 +581,8 @@ DECLARE_Int32(brpc_heavy_work_pool_threads);
DECLARE_Int32(brpc_light_work_pool_threads);
DECLARE_Int32(brpc_heavy_work_pool_max_queue_size);
DECLARE_Int32(brpc_light_work_pool_max_queue_size);
+DECLARE_Int32(brpc_arrow_flight_work_pool_threads);
+DECLARE_Int32(brpc_arrow_flight_work_pool_max_queue_size);
// The maximum amount of data that can be processed by a stream load
DECLARE_mInt64(streaming_load_max_mb);
@@ -701,6 +704,9 @@ DECLARE_mInt32(result_buffer_cancelled_interval_time);
// arrow flight result sink buffer rows size, default 4096 * 8
DECLARE_mInt32(arrow_flight_result_sink_buffer_size_rows);
+// The timeout for ADBC Client to wait for data using arrow flight reader.
+// If the query is very complex and no result is generated after this time,
consider increasing this timeout.
+DECLARE_mInt32(arrow_flight_reader_brpc_controller_timeout_ms);
// the increased frequency of priority for remaining tasks in
BlockingPriorityQueue
DECLARE_mInt32(priority_queue_remaining_tasks_increased_frequency);
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 6fe0b7f9e25..60a46544300 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -115,7 +115,7 @@ Status ResultFileSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& i
// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), p._buf_size, &_sender,
state->enable_pipeline_exec(),
- state->execution_timeout()));
+ state));
// create writer
_writer.reset(new (std::nothrow) vectorized::VFileResultWriter(
p._file_opts.get(), p._storage_type,
state->fragment_instance_id(),
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 8f33aa9bed2..d2dfa89cdd6 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -69,8 +69,7 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
// create sender
auto& p = _parent->cast<ResultSinkOperatorX>();
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
- state->fragment_instance_id(), p._result_sink_buffer_size_rows,
&_sender, true,
- state->execution_timeout()));
+ state->fragment_instance_id(), p._result_sink_buffer_size_rows,
&_sender, true, state));
((PipBufferControlBlock*)_sender.get())->set_dependency(_dependency->shared_from_this());
return Status::OK();
}
@@ -98,12 +97,11 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
- RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs,
&arrow_schema,
- state->timezone()));
-
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
- arrow_schema);
+ RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs,
&arrow_schema,
+ state->timezone()));
+ _sender->register_arrow_schema(arrow_schema);
_writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
- _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
+ _sender.get(), _output_vexpr_ctxs, _profile));
break;
}
default:
diff --git a/be/src/runtime/buffer_control_block.cpp
b/be/src/runtime/buffer_control_block.cpp
index c61c98a324b..f2ac2780c9d 100644
--- a/be/src/runtime/buffer_control_block.cpp
+++ b/be/src/runtime/buffer_control_block.cpp
@@ -33,9 +33,11 @@
#include "arrow/record_batch.h"
#include "arrow/type_fwd.h"
#include "pipeline/exec/result_sink_operator.h"
-#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
+#include "util/runtime_profile.h"
+#include "util/string_util.h"
#include "util/thrift_util.h"
+#include "vec/core/block.h"
namespace doris {
@@ -93,14 +95,80 @@ void GetResultBatchCtx::on_data(const
std::unique_ptr<TFetchDataResult>& t_resul
delete this;
}
-BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size)
+void GetArrowResultBatchCtx::on_failure(const Status& status) {
+ DCHECK(!status.ok()) << "status is ok, errmsg=" << status;
+ status.to_protobuf(result->mutable_status());
+ delete this;
+}
+
+void GetArrowResultBatchCtx::on_close(int64_t packet_seq) {
+ Status status;
+ status.to_protobuf(result->mutable_status());
+ result->set_packet_seq(packet_seq);
+ result->set_eos(true);
+ delete this;
+}
+
+void GetArrowResultBatchCtx::on_data(
+ const std::shared_ptr<vectorized::Block>& block, int64_t packet_seq,
int be_exec_version,
+ segment_v2::CompressionTypePB fragement_transmission_compression_type,
std::string timezone,
+ RuntimeProfile::Counter* serialize_batch_ns_timer,
+ RuntimeProfile::Counter* uncompressed_bytes_counter,
+ RuntimeProfile::Counter* compressed_bytes_counter) {
+ Status st = Status::OK();
+ if (result != nullptr) {
+ size_t uncompressed_bytes = 0, compressed_bytes = 0;
+ SCOPED_TIMER(serialize_batch_ns_timer);
+ st = block->serialize(be_exec_version, result->mutable_block(),
&uncompressed_bytes,
+ &compressed_bytes,
fragement_transmission_compression_type, false);
+ COUNTER_UPDATE(uncompressed_bytes_counter, uncompressed_bytes);
+ COUNTER_UPDATE(compressed_bytes_counter, compressed_bytes);
+ if (st.ok()) {
+ result->set_packet_seq(packet_seq);
+ result->set_eos(false);
+ if (packet_seq == 0) {
+ result->set_timezone(timezone);
+ }
+ } else {
+ result->clear_block();
+ result->set_packet_seq(packet_seq);
+ LOG(WARNING) << "TFetchDataResult serialize failed, errmsg=" << st;
+ }
+ } else {
+ result->set_empty_batch(true);
+ result->set_packet_seq(packet_seq);
+ result->set_eos(false);
+ }
+
+ /// The size limit of proto buffer message is 2G
+ if (result->ByteSizeLong() > std::numeric_limits<int32_t>::max()) {
+ st = Status::InternalError("Message size exceeds 2GB: {}",
result->ByteSizeLong());
+ result->clear_block();
+ }
+ st.to_protobuf(result->mutable_status());
+ delete this;
+}
+
+BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size,
RuntimeState* state)
: _fragment_id(id),
_is_close(false),
_is_cancelled(false),
_buffer_rows(0),
_buffer_limit(buffer_size),
- _packet_num(0) {
+ _packet_num(0),
+ _timezone(state->timezone()),
+ _timezone_obj(state->timezone_obj()),
+ _be_exec_version(state->be_exec_version()),
+ _fragement_transmission_compression_type(
+ state->fragement_transmission_compression_type()),
+ _profile("BufferControlBlock " + print_id(_fragment_id)) {
_query_statistics = std::make_unique<QueryStatistics>();
+ _serialize_batch_ns_timer = ADD_TIMER(&_profile, "SerializeBatchNsTime");
+ _uncompressed_bytes_counter = ADD_COUNTER(&_profile, "UncompressedBytes",
TUnit::BYTES);
+ _compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes",
TUnit::BYTES);
+ _mem_tracker = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::QUERY,
+ fmt::format("BufferControlBlock#FragmentInstanceId={}",
print_id(_fragment_id)));
}
BufferControlBlock::~BufferControlBlock() {
@@ -157,28 +225,29 @@ Status
BufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& result,
return Status::OK();
}
-Status
BufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>&
result) {
+Status BufferControlBlock::add_arrow_batch(std::shared_ptr<vectorized::Block>&
result) {
std::unique_lock<std::mutex> l(_lock);
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}
- int num_rows = result->num_rows();
-
- while ((!_arrow_flight_batch_queue.empty() && _buffer_rows >
_buffer_limit) && !_is_cancelled) {
- _data_removal.wait_for(l, std::chrono::seconds(1));
- }
-
- if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
+ if (_waiting_arrow_result_batch_rpc.empty()) {
+ // TODO: Merge result into block to reduce rpc times
+ int num_rows = result->rows();
+ _arrow_flight_result_batch_queue.push_back(std::move(result));
+ _buffer_rows += num_rows;
+ _arrow_data_arrival
+ .notify_one(); // Only valid for
get_arrow_batch(std::shared_ptr<vectorized::Block>,)
+ } else {
+ auto* ctx = _waiting_arrow_result_batch_rpc.front();
+ _waiting_arrow_result_batch_rpc.pop_front();
+ ctx->on_data(result, _packet_num, _be_exec_version,
+ _fragement_transmission_compression_type, _timezone,
_serialize_batch_ns_timer,
+ _uncompressed_bytes_counter, _compressed_bytes_counter);
+ _packet_num++;
}
- // TODO: merge RocordBatch, ToStructArray -> Make again
-
- _arrow_flight_batch_queue.push_back(std::move(result));
- _buffer_rows += num_rows;
- _data_arrival.notify_one();
return Status::OK();
}
@@ -211,37 +280,113 @@ void BufferControlBlock::get_batch(GetResultBatchCtx*
ctx) {
_waiting_rpc.push_back(ctx);
}
-Status
BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
result) {
+Status BufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>*
result,
+ cctz::time_zone& timezone_obj) {
std::unique_lock<std::mutex> l(_lock);
if (!_status.ok()) {
return _status;
}
if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
+ return Status::Cancelled(fmt::format("Cancelled ()",
print_id(_fragment_id)));
}
- while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
- _data_arrival.wait_for(l, std::chrono::seconds(1));
+ while (_arrow_flight_result_batch_queue.empty() && !_is_cancelled &&
!_is_close) {
+ _arrow_data_arrival.wait_for(l, std::chrono::milliseconds(20));
}
if (_is_cancelled) {
- return Status::Cancelled("Cancelled");
+ return Status::Cancelled(fmt::format("Cancelled ()",
print_id(_fragment_id)));
}
- if (!_arrow_flight_batch_queue.empty()) {
- *result = std::move(_arrow_flight_batch_queue.front());
- _arrow_flight_batch_queue.pop_front();
- _buffer_rows -= (*result)->num_rows();
- _data_removal.notify_one();
+ if (!_arrow_flight_result_batch_queue.empty()) {
+ *result = std::move(_arrow_flight_result_batch_queue.front());
+ _arrow_flight_result_batch_queue.pop_front();
+ timezone_obj = _timezone_obj;
+ _buffer_rows -= (*result)->rows();
_packet_num++;
return Status::OK();
}
// normal path end
if (_is_close) {
+ std::stringstream ss;
+ _profile.pretty_print(&ss);
+ VLOG_NOTICE << fmt::format(
+ "BufferControlBlock finished, fragment_id={}, is_close={},
is_cancelled={}, "
+ "packet_num={}, peak_memory_usage={}, profile={}",
+ print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
+ _mem_tracker->peak_consumption(), ss.str());
+ return Status::OK();
+ }
+ return Status::InternalError(
+ fmt::format("Get Arrow Batch Abnormal Ending ()",
print_id(_fragment_id)));
+}
+
+void BufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
+ std::unique_lock<std::mutex> l(_lock);
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ if (!_status.ok()) {
+ ctx->on_failure(_status);
+ return;
+ }
+ if (_is_cancelled) {
+ ctx->on_failure(Status::Cancelled(fmt::format("Cancelled ()",
print_id(_fragment_id))));
+ return;
+ }
+
+ if (!_arrow_flight_result_batch_queue.empty()) {
+ auto block = _arrow_flight_result_batch_queue.front();
+ _arrow_flight_result_batch_queue.pop_front();
+
+ ctx->on_data(block, _packet_num, _be_exec_version,
_fragement_transmission_compression_type,
+ _timezone, _serialize_batch_ns_timer,
_uncompressed_bytes_counter,
+ _compressed_bytes_counter);
+ _buffer_rows -= block->rows();
+ _packet_num++;
+ return;
+ }
+
+ // normal path end
+ if (_is_close) {
+ ctx->on_close(_packet_num);
+ std::stringstream ss;
+ _profile.pretty_print(&ss);
+ VLOG_NOTICE << fmt::format(
+ "BufferControlBlock finished, fragment_id={}, is_close={},
is_cancelled={}, "
+ "packet_num={}, peak_memory_usage={}, profile={}",
+ print_id(_fragment_id), _is_close, _is_cancelled, _packet_num,
+ _mem_tracker->peak_consumption(), ss.str());
+ return;
+ }
+ // no ready data, push ctx to waiting list
+ _waiting_arrow_result_batch_rpc.push_back(ctx);
+}
+
+void BufferControlBlock::register_arrow_schema(const
std::shared_ptr<arrow::Schema>& arrow_schema) {
+ std::lock_guard<std::mutex> l(_lock);
+ _arrow_schema = arrow_schema;
+}
+
+Status BufferControlBlock::find_arrow_schema(std::shared_ptr<arrow::Schema>*
arrow_schema) {
+ std::unique_lock<std::mutex> l(_lock);
+ if (!_status.ok()) {
+ return _status;
+ }
+ if (_is_cancelled) {
+ return Status::Cancelled(fmt::format("Cancelled ()",
print_id(_fragment_id)));
+ }
+
+ // normal path end
+ if (_arrow_schema != nullptr) {
+ *arrow_schema = _arrow_schema;
return Status::OK();
}
- return Status::InternalError("Get Arrow Batch Abnormal Ending");
+
+ if (_is_close) {
+ return Status::RuntimeError(fmt::format("Closed ()",
print_id(_fragment_id)));
+ }
+ return Status::InternalError(
+ fmt::format("Get Arrow Schema Abnormal Ending ()",
print_id(_fragment_id)));
}
Status BufferControlBlock::close(Status exec_status) {
@@ -251,6 +396,7 @@ Status BufferControlBlock::close(Status exec_status) {
// notify blocked get thread
_data_arrival.notify_all();
+ _arrow_data_arrival.notify_all();
if (!_waiting_rpc.empty()) {
if (_status.ok()) {
for (auto& ctx : _waiting_rpc) {
@@ -263,18 +409,38 @@ Status BufferControlBlock::close(Status exec_status) {
}
_waiting_rpc.clear();
}
+
+ if (!_waiting_arrow_result_batch_rpc.empty()) {
+ if (_status.ok()) {
+ for (auto& ctx : _waiting_arrow_result_batch_rpc) {
+ ctx->on_close(_packet_num);
+ }
+ } else {
+ for (auto& ctx : _waiting_arrow_result_batch_rpc) {
+ ctx->on_failure(_status);
+ }
+ }
+ _waiting_arrow_result_batch_rpc.clear();
+ }
return Status::OK();
}
void BufferControlBlock::cancel(const Status& reason) {
std::unique_lock<std::mutex> l(_lock);
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
_is_cancelled = true;
_data_removal.notify_all();
_data_arrival.notify_all();
+ _arrow_data_arrival.notify_all();
for (auto& ctx : _waiting_rpc) {
ctx->on_failure(reason);
}
_waiting_rpc.clear();
+ for (auto& ctx : _waiting_arrow_result_batch_rpc) {
+ ctx->on_failure(Status::Cancelled("Cancelled"));
+ }
+ _waiting_arrow_result_batch_rpc.clear();
+ _arrow_flight_result_batch_queue.clear();
}
Status PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>&
result,
@@ -284,7 +450,7 @@ Status
PipBufferControlBlock::add_batch(std::unique_ptr<TFetchDataResult>& resul
return Status::OK();
}
-Status
PipBufferControlBlock::add_arrow_batch(std::shared_ptr<arrow::RecordBatch>&
result) {
+Status
PipBufferControlBlock::add_arrow_batch(std::shared_ptr<vectorized::Block>&
result) {
RETURN_IF_ERROR(BufferControlBlock::add_arrow_batch(result));
_update_dependency();
return Status::OK();
@@ -295,12 +461,18 @@ void PipBufferControlBlock::get_batch(GetResultBatchCtx*
ctx) {
_update_dependency();
}
-Status
PipBufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
result) {
- RETURN_IF_ERROR(BufferControlBlock::get_arrow_batch(result));
+Status
PipBufferControlBlock::get_arrow_batch(std::shared_ptr<vectorized::Block>*
result,
+ cctz::time_zone& timezone_obj) {
+ RETURN_IF_ERROR(BufferControlBlock::get_arrow_batch(result, timezone_obj));
_update_dependency();
return Status::OK();
}
+void PipBufferControlBlock::get_arrow_batch(GetArrowResultBatchCtx* ctx) {
+ BufferControlBlock::get_arrow_batch(ctx);
+ _update_dependency();
+}
+
void PipBufferControlBlock::cancel(const Status& reason) {
BufferControlBlock::cancel(reason);
_update_dependency();
@@ -322,7 +494,7 @@ void PipBufferControlBlock::_update_dependency() {
}
void PipBufferControlBlock::_update_batch_queue_empty() {
- _batch_queue_empty = _fe_result_batch_queue.empty() &&
_arrow_flight_batch_queue.empty();
+ _batch_queue_empty = _fe_result_batch_queue.empty() &&
_arrow_flight_result_batch_queue.empty();
_update_dependency();
}
diff --git a/be/src/runtime/buffer_control_block.h
b/be/src/runtime/buffer_control_block.h
index 9e991613f2e..724d86a2dc5 100644
--- a/be/src/runtime/buffer_control_block.h
+++ b/be/src/runtime/buffer_control_block.h
@@ -17,8 +17,11 @@
#pragma once
+#include <arrow/type.h>
+#include <cctz/time_zone.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
+#include <gen_cpp/segment_v2.pb.h>
#include <stdint.h>
#include <atomic>
@@ -29,7 +32,9 @@
#include <mutex>
#include "common/status.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/query_statistics.h"
+#include "util/runtime_profile.h"
namespace google {
namespace protobuf {
@@ -51,7 +56,13 @@ namespace pipeline {
class Dependency;
} // namespace pipeline
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
class PFetchDataResult;
+class PFetchArrowDataResult;
+class RuntimeState;
struct GetResultBatchCtx {
brpc::Controller* cntl = nullptr;
@@ -68,20 +79,46 @@ struct GetResultBatchCtx {
bool eos = false);
};
+struct GetArrowResultBatchCtx {
+ brpc::Controller* cntl = nullptr;
+ PFetchArrowDataResult* result = nullptr;
+ google::protobuf::Closure* done = nullptr;
+
+ GetArrowResultBatchCtx(brpc::Controller* cntl_, PFetchArrowDataResult*
result_,
+ google::protobuf::Closure* done_)
+ : cntl(cntl_), result(result_), done(done_) {}
+
+ void on_failure(const Status& status);
+ void on_close(int64_t packet_seq);
+ void on_data(const std::shared_ptr<vectorized::Block>& block, int64_t
packet_seq,
+ int be_exec_version,
+ segment_v2::CompressionTypePB
fragement_transmission_compression_type,
+ std::string timezone, RuntimeProfile::Counter*
serialize_batch_ns_timer,
+ RuntimeProfile::Counter* uncompressed_bytes_counter,
+ RuntimeProfile::Counter* compressed_bytes_counter);
+};
+
// buffer used for result customer and producer
class BufferControlBlock {
public:
- BufferControlBlock(const TUniqueId& id, int buffer_size);
+ BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState*
state);
virtual ~BufferControlBlock();
Status init();
// Only one fragment is written, so can_sink returns true, then the sink
must be executed
virtual bool can_sink();
virtual Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool
is_pipeline = false);
- virtual Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>&
result);
+ virtual Status add_arrow_batch(std::shared_ptr<vectorized::Block>& result);
virtual void get_batch(GetResultBatchCtx* ctx);
- virtual Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
result);
+ // for ArrowFlightBatchLocalReader
+ virtual Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result,
+ cctz::time_zone& timezone_obj);
+ // for ArrowFlightBatchRemoteReader
+ virtual void get_arrow_batch(GetArrowResultBatchCtx* ctx);
+
+ virtual void register_arrow_schema(const std::shared_ptr<arrow::Schema>&
arrow_schema);
+ virtual Status find_arrow_schema(std::shared_ptr<arrow::Schema>*
arrow_schema);
// close buffer block, set _status to exec_status and set _is_close to
true;
// called because data has been read or error happened.
@@ -90,6 +127,7 @@ public:
virtual void cancel(const Status& reason);
[[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }
+ [[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() { return
_mem_tracker; }
void update_return_rows(int64_t num_rows) {
// _query_statistics may be null when the result sink init failed
@@ -102,12 +140,12 @@ public:
protected:
virtual bool _get_batch_queue_empty() {
- return _fe_result_batch_queue.empty() &&
_arrow_flight_batch_queue.empty();
+ return _fe_result_batch_queue.empty() &&
_arrow_flight_result_batch_queue.empty();
}
virtual void _update_batch_queue_empty() {}
using FeResultQueue = std::list<std::unique_ptr<TFetchDataResult>>;
- using ArrowFlightResultQueue =
std::list<std::shared_ptr<arrow::RecordBatch>>;
+ using ArrowFlightResultQueue =
std::list<std::shared_ptr<vectorized::Block>>;
// result's query id
TUniqueId _fragment_id;
@@ -120,7 +158,9 @@ protected:
// blocking queue for batch
FeResultQueue _fe_result_batch_queue;
- ArrowFlightResultQueue _arrow_flight_batch_queue;
+ ArrowFlightResultQueue _arrow_flight_result_batch_queue;
+ // for arrow flight
+ std::shared_ptr<arrow::Schema> _arrow_schema;
// protects all subsequent data in this block
std::mutex _lock;
@@ -128,17 +168,33 @@ protected:
std::condition_variable _data_arrival;
// signal removal of data by stream consumer
std::condition_variable _data_removal;
+ // get arrow flight result is a sync method, need wait for data ready and
return result.
+ // TODO, waiting for data will block pipeline, so use a request pool to
save requests waiting for data.
+ std::condition_variable _arrow_data_arrival;
std::deque<GetResultBatchCtx*> _waiting_rpc;
+ std::deque<GetArrowResultBatchCtx*> _waiting_arrow_result_batch_rpc;
// only used for FE using return rows to check limit
std::unique_ptr<QueryStatistics> _query_statistics;
+
+ std::string _timezone;
+ cctz::time_zone _timezone_obj;
+ int _be_exec_version;
+ segment_v2::CompressionTypePB _fragement_transmission_compression_type;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+
+ // only used for ArrowFlightBatchRemoteReader
+ RuntimeProfile _profile;
+ RuntimeProfile::Counter* _serialize_batch_ns_timer = nullptr;
+ RuntimeProfile::Counter* _uncompressed_bytes_counter = nullptr;
+ RuntimeProfile::Counter* _compressed_bytes_counter = nullptr;
};
class PipBufferControlBlock : public BufferControlBlock {
public:
- PipBufferControlBlock(const TUniqueId& id, int buffer_size)
- : BufferControlBlock(id, buffer_size) {}
+ PipBufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState*
state)
+ : BufferControlBlock(id, buffer_size, state) {}
bool can_sink() override {
return _get_batch_queue_empty() || _buffer_rows < _buffer_limit ||
_is_cancelled;
@@ -146,11 +202,14 @@ public:
Status add_batch(std::unique_ptr<TFetchDataResult>& result, bool
is_pipeline = true) override;
- Status add_arrow_batch(std::shared_ptr<arrow::RecordBatch>& result)
override;
+ Status add_arrow_batch(std::shared_ptr<vectorized::Block>& result)
override;
void get_batch(GetResultBatchCtx* ctx) override;
- Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result)
override;
+ Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result,
+ cctz::time_zone& timezone_obj) override;
+
+ void get_arrow_batch(GetArrowResultBatchCtx* ctx) override;
void cancel(const Status& reason) override;
diff --git a/be/src/runtime/result_buffer_mgr.cpp
b/be/src/runtime/result_buffer_mgr.cpp
index f81c9b1094f..a15b3115d18 100644
--- a/be/src/runtime/result_buffer_mgr.cpp
+++ b/be/src/runtime/result_buffer_mgr.cpp
@@ -34,6 +34,7 @@
#include "arrow/type_fwd.h"
#include "common/status.h"
#include "runtime/buffer_control_block.h"
+#include "runtime/runtime_state.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
#include "util/thread.h"
@@ -69,7 +70,7 @@ Status ResultBufferMgr::init() {
Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int
buffer_size,
std::shared_ptr<BufferControlBlock>*
sender,
- bool enable_pipeline, int exec_timout) {
+ bool enable_pipeline, RuntimeState*
state) {
*sender = find_control_block(query_id);
if (*sender != nullptr) {
LOG(WARNING) << "already have buffer control block for this instance "
<< query_id;
@@ -79,9 +80,9 @@ Status ResultBufferMgr::create_sender(const TUniqueId&
query_id, int buffer_size
std::shared_ptr<BufferControlBlock> control_block = nullptr;
if (enable_pipeline) {
- control_block = std::make_shared<PipBufferControlBlock>(query_id,
buffer_size);
+ control_block = std::make_shared<PipBufferControlBlock>(query_id,
buffer_size, state);
} else {
- control_block = std::make_shared<BufferControlBlock>(query_id,
buffer_size);
+ control_block = std::make_shared<BufferControlBlock>(query_id,
buffer_size, state);
}
{
@@ -92,7 +93,7 @@ Status ResultBufferMgr::create_sender(const TUniqueId&
query_id, int buffer_size
// otherwise in some case may block all fragment handle threads
// details see issue https://github.com/apache/doris/issues/16203
// add extra 5s for avoid corner case
- int64_t max_timeout = time(nullptr) + exec_timout + 5;
+ int64_t max_timeout = time(nullptr) + state->execution_timeout() + 5;
cancel_at_time(max_timeout, query_id);
}
*sender = control_block;
@@ -110,27 +111,19 @@ std::shared_ptr<BufferControlBlock>
ResultBufferMgr::find_control_block(const TU
return std::shared_ptr<BufferControlBlock>();
}
-void ResultBufferMgr::register_arrow_schema(const TUniqueId& query_id,
- const
std::shared_ptr<arrow::Schema>& arrow_schema) {
- std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock);
- _arrow_schema_map.insert(std::make_pair(query_id, arrow_schema));
-}
-
-std::shared_ptr<arrow::Schema> ResultBufferMgr::find_arrow_schema(const
TUniqueId& query_id) {
- std::shared_lock<std::shared_mutex> rlock(_arrow_schema_map_lock);
- auto iter = _arrow_schema_map.find(query_id);
-
- if (_arrow_schema_map.end() != iter) {
- return iter->second;
+Status ResultBufferMgr::find_arrow_schema(const TUniqueId& finst_id,
+ std::shared_ptr<arrow::Schema>*
schema) {
+ std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
+ if (cb == nullptr) {
+ return Status::InternalError(
+ "no arrow schema for this query, maybe query has been
canceled, finst_id={}",
+ print_id(finst_id));
}
-
- return nullptr;
+ return cb->find_arrow_schema(schema);
}
void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx*
ctx) {
- TUniqueId tid;
- tid.__set_hi(finst_id.hi());
- tid.__set_lo(finst_id.lo());
+ TUniqueId tid = UniqueId(finst_id).to_thrift();
std::shared_ptr<BufferControlBlock> cb = find_control_block(tid);
if (cb == nullptr) {
LOG(WARNING) << "no result for this query, id=" << print_id(tid);
@@ -140,17 +133,43 @@ void ResultBufferMgr::fetch_data(const PUniqueId&
finst_id, GetResultBatchCtx* c
cb->get_batch(ctx);
}
+Status ResultBufferMgr::find_mem_tracker(const TUniqueId& finst_id,
+ std::shared_ptr<MemTrackerLimiter>*
mem_tracker) {
+ std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
+ if (cb == nullptr) {
+ return Status::InternalError(
+ "no result for this query, maybe query has been canceled,
finst_id={}",
+ print_id(finst_id));
+ }
+ *mem_tracker = cb->mem_tracker();
+ return Status::OK();
+}
+
Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
- std::shared_ptr<arrow::RecordBatch>*
result) {
+ std::shared_ptr<vectorized::Block>*
result,
+ cctz::time_zone& timezone_obj) {
std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
if (cb == nullptr) {
- LOG(WARNING) << "no result for this query, id=" << print_id(finst_id);
- return Status::InternalError("no result for this query");
+ return Status::InternalError(
+ "no result for this query, maybe query has been canceled,
finst_id={}",
+ print_id(finst_id));
}
- RETURN_IF_ERROR(cb->get_arrow_batch(result));
+ RETURN_IF_ERROR(cb->get_arrow_batch(result, timezone_obj));
return Status::OK();
}
+void ResultBufferMgr::fetch_arrow_data(const PUniqueId& finst_id,
GetArrowResultBatchCtx* ctx) {
+ TUniqueId tid = UniqueId(finst_id).to_thrift();
+ std::shared_ptr<BufferControlBlock> cb = find_control_block(tid);
+ if (cb == nullptr) {
+ ctx->on_failure(Status::InternalError(
+ "no result for this query, maybe query has been canceled,
finst_id={}",
+ print_id(tid)));
+ return;
+ }
+ cb->get_arrow_batch(ctx);
+}
+
void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) {
{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
@@ -161,15 +180,6 @@ void ResultBufferMgr::cancel(const TUniqueId& query_id,
const Status& reason) {
_buffer_map.erase(iter);
}
}
-
- {
- std::unique_lock<std::shared_mutex> wlock(_arrow_schema_map_lock);
- auto arrow_schema_iter = _arrow_schema_map.find(query_id);
-
- if (_arrow_schema_map.end() != arrow_schema_iter) {
- _arrow_schema_map.erase(arrow_schema_iter);
- }
- }
}
void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId&
query_id) {
diff --git a/be/src/runtime/result_buffer_mgr.h
b/be/src/runtime/result_buffer_mgr.h
index 18846684233..971974a3b63 100644
--- a/be/src/runtime/result_buffer_mgr.h
+++ b/be/src/runtime/result_buffer_mgr.h
@@ -17,7 +17,9 @@
#pragma once
+#include <cctz/time_zone.h>
#include <gen_cpp/Types_types.h>
+#include <gen_cpp/segment_v2.pb.h>
#include <ctime>
#include <map>
@@ -41,8 +43,14 @@ namespace doris {
class BufferControlBlock;
struct GetResultBatchCtx;
+struct GetArrowResultBatchCtx;
class PUniqueId;
+class RuntimeState;
+class MemTrackerLimiter;
class Thread;
+namespace vectorized {
+class Block;
+} // namespace vectorized
// manage all result buffer control block in one backend
class ResultBufferMgr {
@@ -59,16 +67,18 @@ public:
// sender is not used when call cancel or unregister
Status create_sender(const TUniqueId& query_id, int buffer_size,
std::shared_ptr<BufferControlBlock>* sender, bool
enable_pipeline,
- int exec_timeout);
+ RuntimeState* state);
// fetch data result to FE
void fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* ctx);
- // fetch data result to Arrow Flight Server
- Status fetch_arrow_data(const TUniqueId& finst_id,
std::shared_ptr<arrow::RecordBatch>* result);
-
- void register_arrow_schema(const TUniqueId& query_id,
- const std::shared_ptr<arrow::Schema>&
arrow_schema);
- std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId&
query_id);
+ // fetch data result to Arrow Flight Client
+ Status fetch_arrow_data(const TUniqueId& finst_id,
std::shared_ptr<vectorized::Block>* result,
+ cctz::time_zone& timezone_obj);
+ // fetch data result to Other BE forwards to Client
+ void fetch_arrow_data(const PUniqueId& finst_id, GetArrowResultBatchCtx*
ctx);
+ Status find_mem_tracker(const TUniqueId& finst_id,
+ std::shared_ptr<MemTrackerLimiter>* mem_tracker);
+ Status find_arrow_schema(const TUniqueId& query_id,
std::shared_ptr<arrow::Schema>* schema);
// cancel
void cancel(const TUniqueId& query_id, const Status& reason);
@@ -79,7 +89,6 @@ public:
private:
using BufferMap = std::unordered_map<TUniqueId,
std::shared_ptr<BufferControlBlock>>;
using TimeoutMap = std::map<time_t, std::vector<TUniqueId>>;
- using ArrowSchemaMap = std::unordered_map<TUniqueId,
std::shared_ptr<arrow::Schema>>;
std::shared_ptr<BufferControlBlock> find_control_block(const TUniqueId&
query_id);
@@ -91,10 +100,6 @@ private:
std::shared_mutex _buffer_map_lock;
// buffer block map
BufferMap _buffer_map;
- // lock for arrow schema map
- std::shared_mutex _arrow_schema_map_lock;
- // for arrow flight
- ArrowSchemaMap _arrow_schema_map;
// lock for timeout map
std::mutex _timeout_lock;
diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
index a07e479d759..e935aff996d 100644
--- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
+++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
@@ -17,53 +17,294 @@
#include "service/arrow_flight/arrow_flight_batch_reader.h"
+#include <arrow/io/memory.h>
+#include <arrow/ipc/reader.h>
#include <arrow/status.h>
+#include <arrow/type.h>
+#include <gen_cpp/internal_service.pb.h>
-#include "arrow/builder.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/result_buffer_mgr.h"
+#include "runtime/thread_context.h"
+#include "service/backend_options.h"
+#include "util/arrow/block_convertor.h"
#include "util/arrow/row_batch.h"
#include "util/arrow/utils.h"
+#include "util/brpc_client_cache.h"
+#include "util/ref_count_closure.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
-namespace doris {
-namespace flight {
+namespace doris::flight {
-std::shared_ptr<arrow::Schema> ArrowFlightBatchReader::schema() const {
- return schema_;
+ArrowFlightBatchReaderBase::ArrowFlightBatchReaderBase(
+ const std::shared_ptr<QueryStatement>& statement)
+ : _statement(statement) {}
+
+std::shared_ptr<arrow::Schema> ArrowFlightBatchReaderBase::schema() const {
+ return _schema;
+}
+
+arrow::Status ArrowFlightBatchReaderBase::_return_invalid_status(const
std::string& msg) {
+ std::string status_msg =
+ fmt::format("ArrowFlightBatchReader {}, packet_seq={},
result={}:{}, finistId={}", msg,
+ _packet_seq, _statement->result_addr.hostname,
_statement->result_addr.port,
+ print_id(_statement->query_id));
+ LOG(WARNING) << status_msg;
+ return arrow::Status::Invalid(status_msg);
}
-ArrowFlightBatchReader::ArrowFlightBatchReader(std::shared_ptr<QueryStatement>
statement,
- std::shared_ptr<arrow::Schema>
schema)
- : statement_(std::move(statement)), schema_(std::move(schema)) {}
+ArrowFlightBatchReaderBase::~ArrowFlightBatchReaderBase() {
+ VLOG_NOTICE << fmt::format(
+ "ArrowFlightBatchReader finished, packet_seq={},
result_addr={}:{}, finistId={}, "
+ "convert_arrow_batch_timer={}, deserialize_block_timer={},
peak_memory_usage={}",
+ _packet_seq, _statement->result_addr.hostname,
_statement->result_addr.port,
+ print_id(_statement->query_id), _convert_arrow_batch_timer,
_deserialize_block_timer,
+ _mem_tracker->peak_consumption());
+}
-arrow::Result<std::shared_ptr<ArrowFlightBatchReader>>
ArrowFlightBatchReader::Create(
- const std::shared_ptr<QueryStatement>& statement_) {
+ArrowFlightBatchLocalReader::ArrowFlightBatchLocalReader(
+ const std::shared_ptr<QueryStatement>& statement,
+ const std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<MemTrackerLimiter>& mem_tracker)
+ : ArrowFlightBatchReaderBase(statement) {
+ _schema = schema;
+ _mem_tracker = mem_tracker;
+}
+
+arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>>
ArrowFlightBatchLocalReader::Create(
+ const std::shared_ptr<QueryStatement>& statement) {
+ DCHECK(statement->result_addr.hostname == BackendOptions::get_localhost());
// Make sure that FE send the fragment to BE and creates the
BufferControlBlock before returning ticket
// to the ADBC client, so that the schema and control block can be found.
- auto schema =
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement_->query_id);
- if (schema == nullptr) {
- ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format(
- "Client not found arrow flight schema, maybe query has been
canceled, queryid: {}",
- print_id(statement_->query_id))));
+ std::shared_ptr<arrow::Schema> schema;
+ RETURN_ARROW_STATUS_IF_ERROR(
+
ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(statement->query_id,
&schema));
+ std::shared_ptr<MemTrackerLimiter> mem_tracker;
+
RETURN_ARROW_STATUS_IF_ERROR(ExecEnv::GetInstance()->result_mgr()->find_mem_tracker(
+ statement->query_id, &mem_tracker));
+
+ std::shared_ptr<ArrowFlightBatchLocalReader> result(
+ new ArrowFlightBatchLocalReader(statement, schema, mem_tracker));
+ return result;
+}
+
+arrow::Status
ArrowFlightBatchLocalReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out)
{
+ // parameter *out not nullptr
+ *out = nullptr;
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ std::shared_ptr<vectorized::Block> result;
+ auto st =
ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(_statement->query_id,
&result,
+
_timezone_obj);
+ st.prepend("ArrowFlightBatchLocalReader fetch arrow data failed");
+ ARROW_RETURN_NOT_OK(to_arrow_status(st));
+ if (result == nullptr) {
+ // eof, normal path end
+ return arrow::Status::OK();
+ }
+
+ {
+ // convert one batch
+ SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer);
+ st = convert_to_arrow_batch(*result, _schema,
arrow::default_memory_pool(), out,
+ _timezone_obj);
+ st.prepend("ArrowFlightBatchLocalReader convert block to arrow batch
failed");
+ ARROW_RETURN_NOT_OK(to_arrow_status(st));
+ }
+
+ _packet_seq++;
+ if (*out != nullptr) {
+ VLOG_NOTICE << "ArrowFlightBatchLocalReader read next: " <<
(*out)->num_rows() << ", "
+ << (*out)->num_columns() << ", packet_seq: " <<
_packet_seq;
}
- std::shared_ptr<ArrowFlightBatchReader> result(new
ArrowFlightBatchReader(statement_, schema));
+ return arrow::Status::OK();
+}
+
+ArrowFlightBatchRemoteReader::ArrowFlightBatchRemoteReader(
+ const std::shared_ptr<QueryStatement>& statement,
+ const std::shared_ptr<PBackendService_Stub>& stub)
+ : ArrowFlightBatchReaderBase(statement), _brpc_stub(stub),
_block(nullptr) {
+ _mem_tracker = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::QUERY,
+ fmt::format("ArrowFlightBatchRemoteReader#QueryId={}",
print_id(_statement->query_id)));
+}
+
+arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>>
ArrowFlightBatchRemoteReader::Create(
+ const std::shared_ptr<QueryStatement>& statement) {
+ std::shared_ptr<PBackendService_Stub> stub =
+ ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
+ statement->result_addr);
+ if (!stub) {
+ std::string msg = fmt::format(
+ "ArrowFlightBatchRemoteReader get rpc stub failed,
result_addr={}:{}, finistId={}",
+ statement->result_addr.hostname, statement->result_addr.port,
+ print_id(statement->query_id));
+ LOG(WARNING) << msg;
+ return arrow::Status::Invalid(msg);
+ }
+
+ std::shared_ptr<ArrowFlightBatchRemoteReader> result(
+ new ArrowFlightBatchRemoteReader(statement, stub));
+ ARROW_RETURN_NOT_OK(result->init_schema());
return result;
}
-arrow::Status
ArrowFlightBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
- // *out not nullptr
+arrow::Status ArrowFlightBatchRemoteReader::_fetch_schema() {
+ Status st;
+ auto request = std::make_shared<PFetchArrowFlightSchemaRequest>();
+ auto* pfinst_id = request->mutable_finst_id();
+ pfinst_id->set_hi(_statement->query_id.hi);
+ pfinst_id->set_lo(_statement->query_id.lo);
+ auto callback =
DummyBrpcCallback<PFetchArrowFlightSchemaResult>::create_shared();
+ auto closure = AutoReleaseClosure<
+ PFetchArrowFlightSchemaRequest,
+
DummyBrpcCallback<PFetchArrowFlightSchemaResult>>::create_unique(request,
callback);
+
callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms);
+ callback->cntl_->ignore_eovercrowded();
+
+ _brpc_stub->fetch_arrow_flight_schema(closure->cntl_.get(),
closure->request_.get(),
+ closure->response_.get(),
closure.get());
+ closure.release();
+ callback->join();
+
+ if (callback->cntl_->Failed()) {
+ if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
+ _brpc_stub, _statement->result_addr.hostname,
_statement->result_addr.port)) {
+ ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+ callback->cntl_->remote_side());
+ }
+ auto error_code = callback->cntl_->ErrorCode();
+ auto error_text = callback->cntl_->ErrorText();
+ return _return_invalid_status(fmt::format("fetch schema error: {},
error_text: {}",
+ berror(error_code),
error_text));
+ }
+ st = Status::create(callback->response_->status());
+ ARROW_RETURN_NOT_OK(to_arrow_status(st));
+
+ if (callback->response_->has_schema() &&
!callback->response_->schema().empty()) {
+ auto input =
+
arrow::io::BufferReader::FromString(std::string(callback->response_->schema()));
+ ARROW_ASSIGN_OR_RAISE(auto reader,
+ arrow::ipc::RecordBatchStreamReader::Open(
+ input.get(),
arrow::ipc::IpcReadOptions::Defaults()));
+ _schema = reader->schema();
+ } else {
+ return _return_invalid_status(fmt::format("fetch schema error: not
find schema"));
+ }
+ return arrow::Status::OK();
+}
+
+arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() {
+ DCHECK(_block == nullptr);
+ while (true) {
+ // if `continue` occurs, data is invalid, continue fetch, block is
nullptr.
+ // if `break` occurs, fetch data successfully (block is not nullptr)
or fetch eos.
+ Status st;
+ auto request = std::make_shared<PFetchArrowDataRequest>();
+ auto* pfinst_id = request->mutable_finst_id();
+ pfinst_id->set_hi(_statement->query_id.hi);
+ pfinst_id->set_lo(_statement->query_id.lo);
+ auto callback =
DummyBrpcCallback<PFetchArrowDataResult>::create_shared();
+ auto closure = AutoReleaseClosure<
+ PFetchArrowDataRequest,
+
DummyBrpcCallback<PFetchArrowDataResult>>::create_unique(request, callback);
+
callback->cntl_->set_timeout_ms(config::arrow_flight_reader_brpc_controller_timeout_ms);
+ callback->cntl_->ignore_eovercrowded();
+
+ _brpc_stub->fetch_arrow_data(closure->cntl_.get(),
closure->request_.get(),
+ closure->response_.get(), closure.get());
+ closure.release();
+ callback->join();
+
+ if (callback->cntl_->Failed()) {
+ if
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
+ _brpc_stub, _statement->result_addr.hostname,
+ _statement->result_addr.port)) {
+ ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
+ callback->cntl_->remote_side());
+ }
+ auto error_code = callback->cntl_->ErrorCode();
+ auto error_text = callback->cntl_->ErrorText();
+ return _return_invalid_status(fmt::format("fetch data error={},
error_text: {}",
+ berror(error_code),
error_text));
+ }
+ st = Status::create(callback->response_->status());
+ ARROW_RETURN_NOT_OK(to_arrow_status(st));
+
+ DCHECK(callback->response_->has_packet_seq());
+ if (_packet_seq != callback->response_->packet_seq()) {
+ return _return_invalid_status(
+ fmt::format("fetch data receive packet failed, expect: {},
receive: {}",
+ _packet_seq,
callback->response_->packet_seq()));
+ }
+ _packet_seq++;
+
+ if (callback->response_->has_eos() && callback->response_->eos()) {
+ break;
+ }
+
+ if (callback->response_->has_empty_batch() &&
callback->response_->empty_batch()) {
+ continue;
+ }
+
+ DCHECK(callback->response_->has_block());
+ if (callback->response_->block().ByteSizeLong() == 0) {
+ continue;
+ }
+
+ std::call_once(_timezone_once_flag, [this, callback] {
+ DCHECK(callback->response_->has_timezone());
+
TimezoneUtils::find_cctz_time_zone(callback->response_->timezone(),
_timezone_obj);
+ });
+
+ {
+ SCOPED_ATOMIC_TIMER(&_deserialize_block_timer);
+ _block = vectorized::Block::create_shared();
+ st = _block->deserialize(callback->response_->block());
+ ARROW_RETURN_NOT_OK(to_arrow_status(st));
+ break;
+ }
+
+ const auto rows = _block->rows();
+ if (rows == 0) {
+ _block = nullptr;
+ continue;
+ }
+ }
+ return arrow::Status::OK();
+}
+
+arrow::Status ArrowFlightBatchRemoteReader::init_schema() {
+ ARROW_RETURN_NOT_OK(_fetch_schema());
+ DCHECK(_schema != nullptr);
+ return arrow::Status::OK();
+}
+
+arrow::Status
ArrowFlightBatchRemoteReader::ReadNext(std::shared_ptr<arrow::RecordBatch>*
out) {
+ // parameter *out not nullptr
*out = nullptr;
- auto st =
ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id,
out);
- if (UNLIKELY(!st.ok())) {
- LOG(WARNING) << "ArrowFlightBatchReader fetch arrow data failed: " +
st.to_string();
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ ARROW_RETURN_NOT_OK(_fetch_data());
+ if (_block == nullptr) {
+ // eof, normal path end, last _fetch_data return block is nullptr
+ return arrow::Status::OK();
+ }
+ {
+ // convert one batch
+ SCOPED_ATOMIC_TIMER(&_convert_arrow_batch_timer);
+ auto st = convert_to_arrow_batch(*_block, _schema,
arrow::default_memory_pool(), out,
+ _timezone_obj);
+ st.prepend("ArrowFlightBatchRemoteReader convert block to arrow batch
failed");
ARROW_RETURN_NOT_OK(to_arrow_status(st));
}
+ _block = nullptr;
+
if (*out != nullptr) {
- VLOG_NOTICE << "ArrowFlightBatchReader read next: " <<
(*out)->num_rows() << ", "
- << (*out)->num_columns();
+ VLOG_NOTICE << "ArrowFlightBatchRemoteReader read next: " <<
(*out)->num_rows() << ", "
+ << (*out)->num_columns() << ", packet_seq: " <<
_packet_seq;
}
return arrow::Status::OK();
}
-} // namespace flight
-} // namespace doris
+} // namespace doris::flight
diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.h
b/be/src/service/arrow_flight/arrow_flight_batch_reader.h
index e0279cbb70d..612ebc8063c 100644
--- a/be/src/service/arrow_flight/arrow_flight_batch_reader.h
+++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.h
@@ -17,40 +17,91 @@
#pragma once
+#include <cctz/time_zone.h>
#include <gen_cpp/Types_types.h>
#include <memory>
+#include <utility>
#include "arrow/record_batch.h"
+#include "runtime/exec_env.h"
namespace doris {
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+
namespace flight {
struct QueryStatement {
public:
TUniqueId query_id;
+ TNetworkAddress result_addr; // BE brpc ip & port
std::string sql;
- QueryStatement(const TUniqueId& query_id_, const std::string& sql_)
- : query_id(query_id_), sql(sql_) {}
+ QueryStatement(TUniqueId query_id_, TNetworkAddress result_addr_,
std::string sql_)
+ : query_id(std::move(query_id_)),
+ result_addr(std::move(result_addr_)),
+ sql(std::move(sql_)) {}
+};
+
+class ArrowFlightBatchReaderBase : public arrow::RecordBatchReader {
+public:
+ // RecordBatchReader force override
+ [[nodiscard]] std::shared_ptr<arrow::Schema> schema() const override;
+
+protected:
+ ArrowFlightBatchReaderBase(const std::shared_ptr<QueryStatement>&
statement);
+ ~ArrowFlightBatchReaderBase() override;
+ arrow::Status _return_invalid_status(const std::string& msg);
+
+ std::shared_ptr<QueryStatement> _statement;
+ std::shared_ptr<arrow::Schema> _schema;
+ cctz::time_zone _timezone_obj;
+ std::atomic<int64_t> _packet_seq = 0;
+
+ std::atomic<int64_t> _convert_arrow_batch_timer = 0;
+ std::atomic<int64_t> _deserialize_block_timer = 0;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};
-class ArrowFlightBatchReader : public arrow::RecordBatchReader {
+class ArrowFlightBatchLocalReader : public ArrowFlightBatchReaderBase {
public:
- static arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> Create(
+ static arrow::Result<std::shared_ptr<ArrowFlightBatchLocalReader>> Create(
const std::shared_ptr<QueryStatement>& statement);
- [[nodiscard]] std::shared_ptr<arrow::Schema> schema() const override;
+ arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out) override;
+private:
+ ArrowFlightBatchLocalReader(const std::shared_ptr<QueryStatement>&
statement,
+ const std::shared_ptr<arrow::Schema>& schema,
+ const std::shared_ptr<MemTrackerLimiter>&
mem_tracker);
+};
+
+class ArrowFlightBatchRemoteReader : public ArrowFlightBatchReaderBase {
+public:
+ static arrow::Result<std::shared_ptr<ArrowFlightBatchRemoteReader>> Create(
+ const std::shared_ptr<QueryStatement>& statement);
+
+ // create arrow RecordBatchReader must initialize the schema.
+ // so when creating arrow RecordBatchReader, fetch result data once,
+ // which will return Block and some necessary information, and extract
arrow schema from Block.
+ arrow::Status init_schema();
arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* out) override;
private:
- std::shared_ptr<QueryStatement> statement_;
- std::shared_ptr<arrow::Schema> schema_;
+ ArrowFlightBatchRemoteReader(const std::shared_ptr<QueryStatement>&
statement,
+ const std::shared_ptr<PBackendService_Stub>&
stub);
- ArrowFlightBatchReader(std::shared_ptr<QueryStatement> statement,
- std::shared_ptr<arrow::Schema> schema);
+ arrow::Status _fetch_schema();
+ arrow::Status _fetch_data();
+
+ std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
+ std::once_flag _timezone_once_flag;
+ std::shared_ptr<vectorized::Block> _block;
};
} // namespace flight
+
} // namespace doris
diff --git a/be/src/service/arrow_flight/flight_sql_service.cpp
b/be/src/service/arrow_flight/flight_sql_service.cpp
index 60b665c62fc..90ee3edfbea 100644
--- a/be/src/service/arrow_flight/flight_sql_service.cpp
+++ b/be/src/service/arrow_flight/flight_sql_service.cpp
@@ -19,15 +19,17 @@
#include <arrow/status.h>
+#include <memory>
+
#include "arrow/flight/sql/server.h"
+#include "gutil/strings/split.h"
#include "service/arrow_flight/arrow_flight_batch_reader.h"
#include "service/arrow_flight/flight_sql_info.h"
#include "service/backend_options.h"
#include "util/arrow/utils.h"
#include "util/uid_util.h"
-namespace doris {
-namespace flight {
+namespace doris::flight {
class FlightSqlServer::Impl {
private:
@@ -41,14 +43,21 @@ private:
return arrow::flight::Ticket {std::move(ticket)};
}
- arrow::Result<std::pair<std::string, std::string>> decode_ticket(const
std::string& ticket) {
- auto divider = ticket.find(':');
- if (divider == std::string::npos) {
- return arrow::Status::Invalid("Malformed ticket");
+ arrow::Result<std::shared_ptr<QueryStatement>> decode_ticket(const
std::string& ticket) {
+ std::vector<string> fields = strings::Split(ticket, "&");
+ if (fields.size() != 4) {
+ return arrow::Status::Invalid(fmt::format("Malformed ticket, size:
{}", fields.size()));
}
- std::string query_id = ticket.substr(0, divider);
- std::string sql = ticket.substr(divider + 1);
- return std::make_pair(std::move(sql), std::move(query_id));
+
+ TUniqueId queryid;
+ parse_id(fields[0], &queryid);
+ TNetworkAddress result_addr;
+ result_addr.hostname = fields[1];
+ result_addr.port = std::stoi(fields[2]);
+ std::string sql = fields[3];
+ std::shared_ptr<QueryStatement> statement =
+ std::make_shared<QueryStatement>(queryid, result_addr, sql);
+ return statement;
}
public:
@@ -59,18 +68,21 @@ public:
arrow::Result<std::unique_ptr<arrow::flight::FlightDataStream>>
DoGetStatement(
const arrow::flight::ServerCallContext& context,
const arrow::flight::sql::StatementQueryTicket& command) {
- ARROW_ASSIGN_OR_RAISE(auto pair,
decode_ticket(command.statement_handle));
- const std::string& sql = pair.first;
- const std::string query_id = pair.second;
- TUniqueId queryid;
- parse_id(query_id, &queryid);
-
- auto statement = std::make_shared<QueryStatement>(queryid, sql);
-
- std::shared_ptr<ArrowFlightBatchReader> reader;
- ARROW_ASSIGN_OR_RAISE(reader,
ArrowFlightBatchReader::Create(statement));
-
- return std::make_unique<arrow::flight::RecordBatchStream>(reader);
+ ARROW_ASSIGN_OR_RAISE(auto statement,
decode_ticket(command.statement_handle));
+ // if IP:BrpcPort in the Ticket is not current BE node,
+ // pulls the query result Block from the BE node specified by
IP:BrpcPort,
+ // converts it to Arrow Batch and returns it to ADBC client.
+ // use brpc to transmit blocks between BEs.
+ if (statement->result_addr.hostname == BackendOptions::get_localhost()
&&
+ statement->result_addr.port == config::brpc_port) {
+ std::shared_ptr<ArrowFlightBatchLocalReader> reader;
+ ARROW_ASSIGN_OR_RAISE(reader,
ArrowFlightBatchLocalReader::Create(statement));
+ return std::make_unique<arrow::flight::RecordBatchStream>(reader);
+ } else {
+ std::shared_ptr<ArrowFlightBatchRemoteReader> reader;
+ ARROW_ASSIGN_OR_RAISE(reader,
ArrowFlightBatchRemoteReader::Create(statement));
+ return std::make_unique<arrow::flight::RecordBatchStream>(reader);
+ }
}
};
@@ -135,5 +147,4 @@ Status FlightSqlServer::join() {
return Status::OK();
}
-} // namespace flight
-} // namespace doris
+} // namespace doris::flight
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index bcadbfd90e7..a82ab9988b1 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -155,6 +155,11 @@
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, MetricUnit::N
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_active_threads,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_pool_max_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(arrow_flight_work_max_threads,
MetricUnit::NOUNIT);
+
bthread_key_t btls_key;
static void thread_context_deleter(void* d) {
@@ -219,7 +224,14 @@ PInternalServiceImpl::PInternalServiceImpl(ExecEnv*
exec_env)
config::brpc_light_work_pool_max_queue_size != -1
?
config::brpc_light_work_pool_max_queue_size
: std::max(10240, CpuInfo::num_cores() *
320),
- "brpc_light") {
+ "brpc_light"),
+ _arrow_flight_work_pool(config::brpc_arrow_flight_work_pool_threads
!= -1
+ ?
config::brpc_arrow_flight_work_pool_threads
+ : std::max(512, CpuInfo::num_cores()
* 16),
+
config::brpc_arrow_flight_work_pool_max_queue_size != -1
+ ?
config::brpc_arrow_flight_work_pool_max_queue_size
+ : std::max(20480,
CpuInfo::num_cores() * 640),
+ "brpc_arrow_flight") {
REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
[this]() { return _heavy_work_pool.get_queue_size();
});
REGISTER_HOOK_METRIC(light_work_pool_queue_size,
@@ -238,6 +250,15 @@ PInternalServiceImpl::PInternalServiceImpl(ExecEnv*
exec_env)
REGISTER_HOOK_METRIC(light_work_max_threads,
[]() { return config::brpc_light_work_pool_threads;
});
+ REGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size,
+ [this]() { return
_arrow_flight_work_pool.get_queue_size(); });
+ REGISTER_HOOK_METRIC(arrow_flight_work_active_threads,
+ [this]() { return
_arrow_flight_work_pool.get_active_threads(); });
+ REGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size,
+ []() { return
config::brpc_arrow_flight_work_pool_max_queue_size; });
+ REGISTER_HOOK_METRIC(arrow_flight_work_max_threads,
+ []() { return
config::brpc_arrow_flight_work_pool_threads; });
+
_exec_env->load_stream_mgr()->set_heavy_work_pool(&_heavy_work_pool);
_exec_env->load_stream_mgr()->set_light_work_pool(&_light_work_pool);
@@ -256,6 +277,11 @@ PInternalServiceImpl::~PInternalServiceImpl() {
DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
DEREGISTER_HOOK_METRIC(light_work_max_threads);
+ DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(arrow_flight_work_active_threads);
+ DEREGISTER_HOOK_METRIC(arrow_flight_work_pool_max_queue_size);
+ DEREGISTER_HOOK_METRIC(arrow_flight_work_max_threads);
+
CHECK_EQ(0, bthread_key_delete(btls_key));
CHECK_EQ(0, bthread_key_delete(AsyncIO::btls_io_ctx_key));
}
@@ -672,6 +698,22 @@ void
PInternalServiceImpl::fetch_data(google::protobuf::RpcController* controlle
}
}
+void PInternalServiceImpl::fetch_arrow_data(google::protobuf::RpcController*
controller,
+ const PFetchArrowDataRequest*
request,
+ PFetchArrowDataResult* result,
+ google::protobuf::Closure* done) {
+ bool ret = _arrow_flight_work_pool.try_offer([this, controller, request,
result, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ auto* cntl = static_cast<brpc::Controller*>(controller);
+ auto* ctx = new GetArrowResultBatchCtx(cntl, result, done);
+ _exec_env->result_mgr()->fetch_arrow_data(request->finst_id(), ctx);
+ });
+ if (!ret) {
+ offer_failed(result, done, _arrow_flight_work_pool);
+ return;
+ }
+}
+
void
PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController*
controller,
const
POutfileWriteSuccessRequest* request,
POutfileWriteSuccessResult*
result,
@@ -877,23 +919,21 @@ void
PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([request, result, done]() {
brpc::ClosureGuard closure_guard(done);
- std::shared_ptr<arrow::Schema> schema =
- ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
- UniqueId(request->finst_id()).to_thrift());
- if (schema == nullptr) {
- LOG(INFO) << "FE not found arrow flight schema, maybe query has
been canceled";
- auto st = Status::NotFound(
- "FE not found arrow flight schema, maybe query has been
canceled");
+ std::shared_ptr<arrow::Schema> schema;
+ auto st = ExecEnv::GetInstance()->result_mgr()->find_arrow_schema(
+ UniqueId(request->finst_id()).to_thrift(), &schema);
+ if (!st.ok()) {
st.to_protobuf(result->mutable_status());
return;
}
std::string schema_str;
- auto st = serialize_arrow_schema(&schema, &schema_str);
+ st = serialize_arrow_schema(&schema, &schema_str);
if (st.ok()) {
result->set_schema(std::move(schema_str));
- if (config::public_access_ip != "") {
+ if (!config::public_access_ip.empty() &&
config::public_access_port != -1) {
result->set_be_arrow_flight_ip(config::public_access_ip);
+ result->set_be_arrow_flight_port(config::public_access_port);
}
}
st.to_protobuf(result->mutable_status());
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 4bf09255ffb..c264feab46e 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -77,6 +77,10 @@ public:
void fetch_data(google::protobuf::RpcController* controller, const
PFetchDataRequest* request,
PFetchDataResult* result, google::protobuf::Closure* done)
override;
+ void fetch_arrow_data(google::protobuf::RpcController* controller,
+ const PFetchArrowDataRequest* request,
PFetchArrowDataResult* result,
+ google::protobuf::Closure* done) override;
+
void outfile_write_success(google::protobuf::RpcController* controller,
const POutfileWriteSuccessRequest* request,
POutfileWriteSuccessResult* result,
@@ -282,6 +286,7 @@ private:
// otherwise as light interface
FifoThreadPool _heavy_work_pool;
FifoThreadPool _light_work_pool;
+ FifoThreadPool _arrow_flight_work_pool;
};
} // namespace doris
diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp
index ba6f4adf6c6..ecdd733e76b 100644
--- a/be/src/util/arrow/row_batch.cpp
+++ b/be/src/util/arrow/row_batch.cpp
@@ -165,9 +165,9 @@ Status convert_to_arrow_field(SlotDescriptor* desc,
std::shared_ptr<arrow::Field
return Status::OK();
}
-Status convert_block_arrow_schema(const vectorized::Block& block,
- std::shared_ptr<arrow::Schema>* result,
- const std::string& timezone) {
+Status get_arrow_schema_from_block(const vectorized::Block& block,
+ std::shared_ptr<arrow::Schema>* result,
+ const std::string& timezone) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (const auto& type_and_name : block) {
std::shared_ptr<arrow::DataType> arrow_type;
@@ -195,9 +195,9 @@ Status convert_to_arrow_schema(const RowDescriptor&
row_desc,
return Status::OK();
}
-Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs&
output_vexpr_ctxs,
- std::shared_ptr<arrow::Schema>* result,
- const std::string& timezone) {
+Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs&
output_vexpr_ctxs,
+ std::shared_ptr<arrow::Schema>* result,
+ const std::string& timezone) {
std::vector<std::shared_ptr<arrow::Field>> fields;
for (int i = 0; i < output_vexpr_ctxs.size(); i++) {
std::shared_ptr<arrow::DataType> arrow_type;
diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h
index 9a33719a1cf..b509275d04a 100644
--- a/be/src/util/arrow/row_batch.h
+++ b/be/src/util/arrow/row_batch.h
@@ -48,13 +48,13 @@ Status convert_to_arrow_type(const TypeDescriptor& type,
std::shared_ptr<arrow::
Status convert_to_arrow_schema(const RowDescriptor& row_desc,
std::shared_ptr<arrow::Schema>* result, const
std::string& timezone);
-Status convert_block_arrow_schema(const vectorized::Block& block,
- std::shared_ptr<arrow::Schema>* result,
- const std::string& timezone);
+Status get_arrow_schema_from_block(const vectorized::Block& block,
+ std::shared_ptr<arrow::Schema>* result,
+ const std::string& timezone);
-Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs&
output_vexpr_ctxs,
- std::shared_ptr<arrow::Schema>* result,
- const std::string& timezone);
+Status get_arrow_schema_from_expr_ctxs(const vectorized::VExprContextSPtrs&
output_vexpr_ctxs,
+ std::shared_ptr<arrow::Schema>* result,
+ const std::string& timezone);
Status serialize_record_batch(const arrow::RecordBatch& record_batch,
std::string* result);
diff --git a/be/src/util/arrow/utils.cpp b/be/src/util/arrow/utils.cpp
index 5ccff849034..742f5bd0fc3 100644
--- a/be/src/util/arrow/utils.cpp
+++ b/be/src/util/arrow/utils.cpp
@@ -33,9 +33,10 @@ Status to_doris_status(const arrow::Status& status) {
}
arrow::Status to_arrow_status(const Status& status) {
- if (status.ok()) {
+ if (LIKELY(status.ok())) {
return arrow::Status::OK();
} else {
+ LOG(WARNING) << status.to_string();
// The length of exception msg returned to the ADBC Client cannot
larger than 8192,
// otherwise ADBC Client will receive:
// `INTERNAL: http2 exception Header size exceeded max allowed size
(8192)`.
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 21e92bb82df..6e27dc73441 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -216,6 +216,11 @@ public:
UIntGauge* heavy_work_max_threads = nullptr;
UIntGauge* light_work_max_threads = nullptr;
+ UIntGauge* arrow_flight_work_pool_queue_size = nullptr;
+ UIntGauge* arrow_flight_work_active_threads = nullptr;
+ UIntGauge* arrow_flight_work_pool_max_queue_size = nullptr;
+ UIntGauge* arrow_flight_work_max_threads = nullptr;
+
UIntGauge* flush_thread_pool_queue_size = nullptr;
UIntGauge* flush_thread_pool_thread_num = nullptr;
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index ca8e1cf3c3b..df38a06b2e3 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -19,21 +19,16 @@
#include "runtime/buffer_control_block.h"
#include "runtime/runtime_state.h"
-#include "util/arrow/block_convertor.h"
-#include "util/arrow/row_batch.h"
+#include "runtime/thread_context.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr_context.h"
-namespace doris {
-namespace vectorized {
+namespace doris::vectorized {
-VArrowFlightResultWriter::VArrowFlightResultWriter(
- BufferControlBlock* sinker, const VExprContextSPtrs& output_vexpr_ctxs,
- RuntimeProfile* parent_profile, const std::shared_ptr<arrow::Schema>&
arrow_schema)
- : _sinker(sinker),
- _output_vexpr_ctxs(output_vexpr_ctxs),
- _parent_profile(parent_profile),
- _arrow_schema(arrow_schema) {}
+VArrowFlightResultWriter::VArrowFlightResultWriter(BufferControlBlock* sinker,
+ const VExprContextSPtrs&
output_vexpr_ctxs,
+ RuntimeProfile*
parent_profile)
+ : _sinker(sinker), _output_vexpr_ctxs(output_vexpr_ctxs),
_parent_profile(parent_profile) {}
Status VArrowFlightResultWriter::init(RuntimeState* state) {
_init_profile();
@@ -41,13 +36,11 @@ Status VArrowFlightResultWriter::init(RuntimeState* state) {
return Status::InternalError("sinker is NULL pointer.");
}
_is_dry_run = state->query_options().dry_run_query;
- _timezone_obj = state->timezone_obj();
return Status::OK();
}
void VArrowFlightResultWriter::_init_profile() {
_append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime");
- _convert_tuple_timer = ADD_CHILD_TIMER(_parent_profile,
"TupleConvertTime", "AppendBatchTime");
_result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime",
"AppendBatchTime");
_sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows",
TUnit::UNIT);
_bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent",
TUnit::BYTES);
@@ -66,29 +59,31 @@ Status VArrowFlightResultWriter::write(Block& input_block) {
RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
input_block, &block));
- // convert one batch
- std::shared_ptr<arrow::RecordBatch> result;
- auto num_rows = block.rows();
- // arrow::RecordBatch without `nbytes()` in C++
- uint64_t bytes_sent = block.bytes();
{
- SCOPED_TIMER(_convert_tuple_timer);
- RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema,
arrow::default_memory_pool(),
- &result, _timezone_obj));
- }
- {
- SCOPED_TIMER(_result_send_timer);
- // If this is a dry run task, no need to send data block
- if (!_is_dry_run) {
- status = _sinker->add_arrow_batch(result);
- }
- if (status.ok()) {
- _written_rows += num_rows;
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_sinker->mem_tracker());
+ std::unique_ptr<vectorized::MutableBlock> mutable_block =
+ vectorized::MutableBlock::create_unique(block.clone_empty());
+
RETURN_IF_ERROR(mutable_block->merge_ignore_overflow(std::move(block)));
+ std::shared_ptr<vectorized::Block> output_block =
vectorized::Block::create_shared();
+ output_block->swap(mutable_block->to_block());
+
+ auto num_rows = output_block->rows();
+ // arrow::RecordBatch without `nbytes()` in C++
+ uint64_t bytes_sent = output_block->bytes();
+ {
+ SCOPED_TIMER(_result_send_timer);
+ // If this is a dry run task, no need to send data block
if (!_is_dry_run) {
- _bytes_sent += bytes_sent;
+ status = _sinker->add_arrow_batch(output_block);
+ }
+ if (status.ok()) {
+ _written_rows += num_rows;
+ if (!_is_dry_run) {
+ _bytes_sent += bytes_sent;
+ }
+ } else {
+ LOG(WARNING) << "append result batch to sink failed.";
}
- } else {
- LOG(WARNING) << "append result batch to sink failed.";
}
}
return status;
@@ -104,5 +99,4 @@ Status VArrowFlightResultWriter::close(Status st) {
return Status::OK();
}
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/varrow_flight_result_writer.h
b/be/src/vec/sink/varrow_flight_result_writer.h
index c95d616b617..a968d5e675c 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.h
+++ b/be/src/vec/sink/varrow_flight_result_writer.h
@@ -17,13 +17,6 @@
#pragma once
-#include <arrow/type.h>
-#include <cctz/time_zone.h>
-#include <stddef.h>
-
-#include <memory>
-#include <vector>
-
#include "common/status.h"
#include "runtime/result_writer.h"
#include "util/runtime_profile.h"
@@ -39,12 +32,11 @@ class Block;
class VArrowFlightResultWriter final : public ResultWriter {
public:
VArrowFlightResultWriter(BufferControlBlock* sinker, const
VExprContextSPtrs& output_vexpr_ctxs,
- RuntimeProfile* parent_profile,
- const std::shared_ptr<arrow::Schema>&
arrow_schema);
+ RuntimeProfile* parent_profile);
Status init(RuntimeState* state) override;
- Status write(Block& block) override;
+ Status write(Block& input_block) override;
bool can_sink() override;
@@ -60,8 +52,6 @@ private:
RuntimeProfile* _parent_profile = nullptr; // parent profile from result
sink. not owned
// total time cost on append batch operation
RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
- // tuple convert timer, child timer of _append_row_batch_timer
- RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
// file write timer, child timer of _append_row_batch_timer
RuntimeProfile::Counter* _result_send_timer = nullptr;
// number of sent rows
@@ -72,10 +62,6 @@ private:
bool _is_dry_run = false;
uint64_t _bytes_sent = 0;
-
- std::shared_ptr<arrow::Schema> _arrow_schema;
-
- cctz::time_zone _timezone_obj;
};
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/vmemory_scratch_sink.cpp
b/be/src/vec/sink/vmemory_scratch_sink.cpp
index 904af9a9f1b..1d270f7beaf 100644
--- a/be/src/vec/sink/vmemory_scratch_sink.cpp
+++ b/be/src/vec/sink/vmemory_scratch_sink.cpp
@@ -88,7 +88,7 @@ Status MemoryScratchSink::send(RuntimeState* state, Block*
input_block, bool eos
*input_block, &block));
std::shared_ptr<arrow::Schema> block_arrow_schema;
// After expr executed, use recaculated schema as final schema
- RETURN_IF_ERROR(convert_block_arrow_schema(block, &block_arrow_schema,
state->timezone()));
+ RETURN_IF_ERROR(get_arrow_schema_from_block(block, &block_arrow_schema,
state->timezone()));
RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema,
arrow::default_memory_pool(),
&result, _timezone_obj));
_queue->blocking_put(result);
diff --git a/be/src/vec/sink/vresult_file_sink.cpp
b/be/src/vec/sink/vresult_file_sink.cpp
index 035a76a2f7f..6ecaaff18b0 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -79,7 +79,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), _buf_size, &_sender,
state->enable_pipeline_exec(),
- state->execution_timeout()));
+ state));
// create writer
_writer.reset(new (std::nothrow) VFileResultWriter(
_file_opts.get(), _storage_type,
state->fragment_instance_id(), _output_vexpr_ctxs,
diff --git a/be/src/vec/sink/vresult_sink.cpp b/be/src/vec/sink/vresult_sink.cpp
index dc63fdb4be6..af91e1f9d50 100644
--- a/be/src/vec/sink/vresult_sink.cpp
+++ b/be/src/vec/sink/vresult_sink.cpp
@@ -90,7 +90,7 @@ Status VResultSink::prepare(RuntimeState* state) {
// create sender
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
state->fragment_instance_id(), _buf_size, &_sender,
state->enable_pipeline_exec(),
- state->execution_timeout()));
+ state));
// create writer based on sink type
switch (_sink_type) {
@@ -106,12 +106,11 @@ Status VResultSink::prepare(RuntimeState* state) {
}
case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
std::shared_ptr<arrow::Schema> arrow_schema;
- RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs,
&arrow_schema,
- state->timezone()));
-
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
- arrow_schema);
+ RETURN_IF_ERROR(get_arrow_schema_from_expr_ctxs(_output_vexpr_ctxs,
&arrow_schema,
+ state->timezone()));
+ _sender->register_arrow_schema(arrow_schema);
_writer.reset(new (std::nothrow)
VArrowFlightResultWriter(_sender.get(), _output_vexpr_ctxs,
- _profile,
arrow_schema));
+ _profile));
break;
}
default:
diff --git a/be/test/runtime/result_buffer_mgr_test.cpp
b/be/test/runtime/result_buffer_mgr_test.cpp
index 152c155ef0a..4ab9186c5fa 100644
--- a/be/test/runtime/result_buffer_mgr_test.cpp
+++ b/be/test/runtime/result_buffer_mgr_test.cpp
@@ -34,6 +34,7 @@ protected:
virtual void SetUp() {}
private:
+ RuntimeState _state;
};
TEST_F(ResultBufferMgrTest, create_normal) {
@@ -43,7 +44,7 @@ TEST_F(ResultBufferMgrTest, create_normal) {
query_id.hi = 100;
std::shared_ptr<BufferControlBlock> control_block1;
- EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1,
false).ok());
+ EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1,
&_state).ok());
}
TEST_F(ResultBufferMgrTest, create_same_buffer) {
@@ -53,9 +54,9 @@ TEST_F(ResultBufferMgrTest, create_same_buffer) {
query_id.hi = 100;
std::shared_ptr<BufferControlBlock> control_block1;
- EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1,
false).ok());
+ EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1,
&_state).ok());
std::shared_ptr<BufferControlBlock> control_block2;
- EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2,
false).ok());
+ EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block2,
&_state).ok());
EXPECT_EQ(control_block1.get(), control_block1.get());
}
@@ -67,7 +68,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_normal) {
query_id.hi = 100;
std::shared_ptr<BufferControlBlock> control_block1;
- EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1,
false).ok());
+ EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1,
&_state).ok());
TFetchDataResult* result = new TFetchDataResult();
result->result_batch.rows.push_back("hello test");
@@ -85,7 +86,7 @@ TEST_F(ResultBufferMgrTest, fetch_data_no_block) {
query_id.hi = 100;
std::shared_ptr<BufferControlBlock> control_block1;
- EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1,
false).ok());
+ EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1,
&_state).ok());
TFetchDataResult* result = new TFetchDataResult();
query_id.lo = 11;
@@ -101,7 +102,7 @@ TEST_F(ResultBufferMgrTest, normal_cancel) {
query_id.hi = 100;
std::shared_ptr<BufferControlBlock> control_block1;
- EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1,
false).ok());
+ EXPECT_TRUE(buffer_mgr.create_sender(query_id, 1024, &control_block1,
&_state).ok());
EXPECT_TRUE(buffer_mgr.cancel(query_id).ok());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
index b0367e8c578..6f45f3faac7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java
@@ -27,6 +27,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
+import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Preconditions;
import com.google.protobuf.Any;
@@ -224,19 +225,36 @@ public class DorisFlightSqlProducer implements
FlightSqlProducer, AutoCloseable
}
} else {
// Now only query stmt will pull results from BE.
- final ByteString handle = ByteString.copyFromUtf8(
- DebugUtil.printId(connectContext.getFinstId()) +
":" + query);
Schema schema =
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow
flight schema is null")
.toRuntimeException();
}
+
+ TUniqueId queryId = connectContext.getFinstId();
+ // Ticket contains the IP and Brpc Port of the Doris BE
node where the query result is located.
+ final ByteString handle = ByteString.copyFromUtf8(
+ DebugUtil.printId(queryId) + "&" +
connectContext.getResultInternalServiceAddr().hostname
+ + "&" +
connectContext.getResultInternalServiceAddr().port + "&" + query);
TicketStatementQuery ticketStatement =
TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
Ticket ticket = new
Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
- Location location =
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
- connectContext.getResultFlightServerAddr().port);
+ Location location;
+ if
(flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
+ // In a production environment, it is often
inconvenient to expose Doris BE nodes
+ // to the external network.
+ // However, a reverse proxy (such as nginx) can be
added to all Doris BE nodes,
+ // and the external client will be randomly routed to
a Doris BE node when connecting to nginx.
+ // The query results of Arrow Flight SQL will be
randomly saved on a Doris BE node.
+ // If it is different from the Doris BE node randomly
routed by nginx,
+ // data forwarding needs to be done inside the Doris
BE node.
+ location =
Location.forGrpcInsecure(flightSQLConnectProcessor.getPublicAccessAddr().hostname,
+
flightSQLConnectProcessor.getPublicAccessAddr().port);
+ } else {
+ location =
Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
+
connectContext.getResultFlightServerAddr().port);
+ }
List<FlightEndpoint> endpoints =
Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will
not callback.
return new FlightInfo(schema, descriptor, endpoints, -1,
-1);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
index a816cf184ca..6724065f99a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/FlightSqlConnectProcessor.java
@@ -53,11 +53,12 @@ import java.util.concurrent.TimeoutException;
/**
* Process one flgiht sql connection.
- *
+ * <p>
* Must use try-with-resources.
*/
public class FlightSqlConnectProcessor extends ConnectProcessor implements
AutoCloseable {
private static final Logger LOG =
LogManager.getLogger(FlightSqlConnectProcessor.class);
+ private TNetworkAddress publicAccessAddr = new TNetworkAddress();
public FlightSqlConnectProcessor(ConnectContext context) {
super(context);
@@ -66,6 +67,10 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
context.setReturnResultFromLocal(true);
}
+ public TNetworkAddress getPublicAccessAddr() {
+ return publicAccessAddr;
+ }
+
public void prepare(MysqlCommand command) {
// set status of query to OK.
ctx.getState().reset();
@@ -123,11 +128,12 @@ public class FlightSqlConnectProcessor extends
ConnectProcessor implements AutoC
}
Status resultStatus = new Status(pResult.getStatus());
if (resultStatus.getErrorCode() != TStatusCode.OK) {
- throw new RuntimeException(String.format("fetch arrow flight
schema failed, finstId: %s, errmsg: %s",
- DebugUtil.printId(tid), resultStatus.toString()));
+ throw new RuntimeException(String.format("fetch arrow flight
schema failed, queryId: %s, errmsg: %s",
+ DebugUtil.printId(tid), resultStatus));
}
- if (pResult.hasBeArrowFlightIp()) {
- ctx.getResultFlightServerAddr().hostname =
pResult.getBeArrowFlightIp().toStringUtf8();
+ if (pResult.hasBeArrowFlightIp() &&
pResult.hasBeArrowFlightPort()) {
+ publicAccessAddr.hostname =
pResult.getBeArrowFlightIp().toStringUtf8();
+ publicAccessAddr.port = pResult.getBeArrowFlightPort();
}
if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
RootAllocator rootAllocator = new
RootAllocator(Integer.MAX_VALUE);
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index c2ec3205541..9c993bb9a93 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -262,6 +262,20 @@ message PFetchDataResult {
optional bool empty_batch = 6;
};
+message PFetchArrowDataRequest {
+ optional PUniqueId finst_id = 1;
+};
+
+message PFetchArrowDataResult {
+ optional PStatus status = 1;
+ // valid when status is ok
+ optional int64 packet_seq = 2;
+ optional bool eos = 3;
+ optional PBlock block = 4;
+ optional bool empty_batch = 5;
+ optional string timezone = 6;
+};
+
message PFetchArrowFlightSchemaRequest {
optional PUniqueId finst_id = 1;
};
@@ -271,6 +285,7 @@ message PFetchArrowFlightSchemaResult {
// valid when status is ok
optional bytes schema = 2;
optional bytes be_arrow_flight_ip = 3;
+ optional int32 be_arrow_flight_port = 4;
};
message KeyTuple {
@@ -922,6 +937,7 @@ service PBackendService {
rpc exec_plan_fragment_start(PExecPlanFragmentStartRequest) returns
(PExecPlanFragmentResult);
rpc cancel_plan_fragment(PCancelPlanFragmentRequest) returns
(PCancelPlanFragmentResult);
rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
+ rpc fetch_arrow_data(PFetchArrowDataRequest) returns
(PFetchArrowDataResult);
rpc tablet_writer_open(PTabletWriterOpenRequest) returns
(PTabletWriterOpenResult);
rpc open_load_stream(POpenLoadStreamRequest) returns
(POpenLoadStreamResponse);
rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns
(PTabletWriterAddBlockResult);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]