yiguolei commented on code in PR #48368:
URL: https://github.com/apache/doris/pull/48368#discussion_r1976233114


##########
be/src/vec/sink/varrow_flight_result_writer.cpp:
##########
@@ -26,16 +30,122 @@
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 
-VArrowFlightResultWriter::VArrowFlightResultWriter(BufferControlBlock* sinker,
+void GetArrowResultBatchCtx::on_failure(const Status& status) {
+    DCHECK(!status.ok()) << "status is ok, errmsg=" << status;
+    status.to_protobuf(_result->mutable_status());
+}
+
+void GetArrowResultBatchCtx::on_close(int64_t packet_seq, int64_t /* 
returned_rows */) {
+    Status status;
+    status.to_protobuf(_result->mutable_status());
+    _result->set_packet_seq(packet_seq);
+    _result->set_eos(true);
+}
+
+Status GetArrowResultBatchCtx::on_data(const 
std::shared_ptr<vectorized::Block>& block,
+                                       const int64_t packet_seq, 
ResultBlockBufferBase* buffer) {
+    if (_result != nullptr) {
+        auto* arrow_buffer = 
assert_cast<ArrowFlightResultBlockBuffer*>(buffer);
+        size_t uncompressed_bytes = 0, compressed_bytes = 0;
+        SCOPED_TIMER(arrow_buffer->_serialize_batch_ns_timer);
+        RETURN_IF_ERROR(block->serialize(
+                arrow_buffer->_be_exec_version, _result->mutable_block(), 
&uncompressed_bytes,
+                &compressed_bytes, 
arrow_buffer->_fragment_transmission_compression_type, false));
+        COUNTER_UPDATE(arrow_buffer->_uncompressed_bytes_counter, 
uncompressed_bytes);
+        COUNTER_UPDATE(arrow_buffer->_compressed_bytes_counter, 
compressed_bytes);
+        _result->set_packet_seq(packet_seq);
+        _result->set_eos(false);
+        if (packet_seq == 0) {
+            _result->set_timezone(arrow_buffer->_timezone);
+        }
+    } else {
+        _result->set_empty_batch(true);
+        _result->set_packet_seq(packet_seq);
+        _result->set_eos(false);
+    }
+    Status st = Status::OK();
+    /// The size limit of proto buffer message is 2G
+    if (_result->ByteSizeLong() > _max_msg_size) {
+        st = Status::InternalError("Message size exceeds 2GB: {}", 
_result->ByteSizeLong());
+        _result->clear_block();
+    }
+    st.to_protobuf(_result->mutable_status());
+    return Status::OK();
+}
+
+Status 
ArrowFlightResultBlockBuffer::find_schema(std::shared_ptr<arrow::Schema>* 
arrow_schema) {

Review Comment:
   get_schema
   not find 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to