xinyiZzz commented on code in PR #48368:
URL: https://github.com/apache/doris/pull/48368#discussion_r1975215603


##########
be/src/vec/sink/varrow_flight_result_writer.h:
##########
@@ -18,21 +18,76 @@
 #pragma once
 
 #include "common/status.h"
+#include "runtime/result_block_buffer.h"
 #include "runtime/result_writer.h"
 #include "util/runtime_profile.h"
 #include "vec/exprs/vexpr_fwd.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
-class BufferControlBlock;
 class RuntimeState;
+class PFetchArrowDataResult;
 
 namespace vectorized {
 class Block;
 
+class GetArrowResultBatchCtx {
+public:
+    using ResultType = vectorized::Block;
+    ENABLE_FACTORY_CREATOR(GetArrowResultBatchCtx)
+    GetArrowResultBatchCtx(PFetchArrowDataResult* result) : _result(result) {}
+#ifdef BE_TEST
+    GetArrowResultBatchCtx() = default;
+#endif
+    MOCK_FUNCTION ~GetArrowResultBatchCtx() = default;
+
+    MOCK_FUNCTION void on_failure(const Status& status);
+    MOCK_FUNCTION void on_close(int64_t packet_seq, int64_t /* returned_rows 
*/);
+    MOCK_FUNCTION Status on_data(const std::shared_ptr<vectorized::Block>& 
block,
+                                 const int64_t packet_seq, 
ResultBlockBufferBase* buffer);
+
+private:
+#ifndef BE_TEST
+    const int32_t _max_msg_size = std::numeric_limits<int32_t>::max();
+#else
+    int32_t _max_msg_size = std::numeric_limits<int32_t>::max();
+#endif
+    PFetchArrowDataResult* _result = nullptr;
+};
+
+class ArrowFlightResultBlockBuffer final : public 
ResultBlockBuffer<GetArrowResultBatchCtx> {
+public:
+    using ResultType = vectorized::Block;
+    ArrowFlightResultBlockBuffer(TUniqueId id, RuntimeState* state,
+                                 std::shared_ptr<arrow::Schema> schema, int 
buffer_size)
+            : ResultBlockBuffer<GetArrowResultBatchCtx>(id, state, 
buffer_size),
+              _arrow_schema(schema),
+              _profile("ResultBlockBuffer " + print_id(_fragment_id)),
+              _timezone_obj(state->timezone_obj()) {
+        _serialize_batch_ns_timer = ADD_TIMER(&_profile, 
"SerializeBatchNsTime");
+        _uncompressed_bytes_counter = ADD_COUNTER(&_profile, 
"UncompressedBytes", TUnit::BYTES);
+        _compressed_bytes_counter = ADD_COUNTER(&_profile, "CompressedBytes", 
TUnit::BYTES);
+    }
+    ~ArrowFlightResultBlockBuffer() override = default;
+    Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result);

Review Comment:
   这个 `get_arrow_batch` 能改个啥更好的名字,和`ResultBlockBuffer::get_batch` 的函数名区分开呢,或者就叫 
`get_batch` 通过注释区分
   
   `get_arrow_batch` 实际取的是 block,由当前 be 的 arrow flight server 调用
   `ResultBlockBuffer::get_batch` 实际取的是 arrow batch,由其他 be brpc 调用



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to