This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new e66ffc1b6d4 [branch-2.1](arrow-flight-sql) Fix pipelineX Unknown
result sink type (#37540)
e66ffc1b6d4 is described below
commit e66ffc1b6d44e3d2b7d35584a564fa84ae569844
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Jul 11 12:30:46 2024 +0800
[branch-2.1](arrow-flight-sql) Fix pipelineX Unknown result sink type
(#37540)
pick ##35804
---
be/src/pipeline/exec/result_sink_operator.cpp | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index ec54e1bdd5c..624b9ca192d 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -25,8 +25,10 @@
#include "runtime/buffer_control_block.h"
#include "runtime/exec_env.h"
#include "runtime/result_buffer_mgr.h"
+#include "util/arrow/row_batch.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
+#include "vec/sink/varrow_flight_result_writer.h"
#include "vec/sink/vmysql_result_writer.h"
#include "vec/sink/vresult_sink.h"
@@ -80,7 +82,7 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
}
// create writer based on sink type
switch (p._sink_type) {
- case TResultSinkType::MYSQL_PROTOCAL:
+ case TResultSinkType::MYSQL_PROTOCAL: {
if (state->mysql_row_binary_format()) {
_writer.reset(new (std::nothrow)
vectorized::VMysqlResultWriter<true>(
_sender.get(), _output_vexpr_ctxs, _profile));
@@ -89,6 +91,16 @@ Status ResultSinkLocalState::open(RuntimeState* state) {
_sender.get(), _output_vexpr_ctxs, _profile));
}
break;
+ }
+ case TResultSinkType::ARROW_FLIGHT_PROTOCAL: {
+ std::shared_ptr<arrow::Schema> arrow_schema;
+ RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs,
&arrow_schema));
+
state->exec_env()->result_mgr()->register_arrow_schema(state->fragment_instance_id(),
+ arrow_schema);
+ _writer.reset(new (std::nothrow) vectorized::VArrowFlightResultWriter(
+ _sender.get(), _output_vexpr_ctxs, _profile, arrow_schema));
+ break;
+ }
default:
return Status::InternalError("Unknown result sink type");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]