github-actions[bot] commented on code in PR #34541:
URL: https://github.com/apache/doris/pull/34541#discussion_r1609515633


##########
be/src/vec/runtime/vdata_stream_recvr.cpp:
##########
@@ -418,12 +425,29 @@ Status VDataStreamRecvr::add_block(const PBlock& pblock, 
int sender_id, int be_n
                                    int64_t packet_seq, 
::google::protobuf::Closure** done) {
     SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id);
     int use_sender_id = _is_merging ? sender_id : 0;
-    return _sender_queues[use_sender_id]->add_block(pblock, be_number, 
packet_seq, done);
+    int64_t current_pblock_rows = 0;
+    RETURN_IF_ERROR(_sender_queues[use_sender_id]->add_block(pblock, 
be_number, packet_seq, done,
+                                                             
&current_pblock_rows));
+    _queue_total_rows[use_sender_id] = _queue_total_rows[use_sender_id] + 
current_pblock_rows;
+    return Status::OK();
 }
 
-void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
+int64_t VDataStreamRecvr::add_block(Block* block, int sender_id, bool 
use_move) {

Review Comment:
   warning: method 'add_block' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   int64_t VDataStreamRecvr::add_block(Block* block, int sender_id, bool 
use_move) const {
   ```
   



##########
be/src/vec/runtime/vdata_stream_recvr.cpp:
##########
@@ -418,12 +425,29 @@
                                    int64_t packet_seq, 
::google::protobuf::Closure** done) {
     SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id);
     int use_sender_id = _is_merging ? sender_id : 0;
-    return _sender_queues[use_sender_id]->add_block(pblock, be_number, 
packet_seq, done);
+    int64_t current_pblock_rows = 0;
+    RETURN_IF_ERROR(_sender_queues[use_sender_id]->add_block(pblock, 
be_number, packet_seq, done,
+                                                             
&current_pblock_rows));
+    _queue_total_rows[use_sender_id] = _queue_total_rows[use_sender_id] + 
current_pblock_rows;
+    return Status::OK();
 }
 
-void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
+int64_t VDataStreamRecvr::add_block(Block* block, int sender_id, bool 
use_move) {
     int use_sender_id = _is_merging ? sender_id : 0;
     _sender_queues[use_sender_id]->add_block(block, use_move);
+    _queue_total_rows[use_sender_id] = _queue_total_rows[use_sender_id] + 
block->rows();
+}
+
+// the sink could eos early. when sink rows have reached limit for all queue, 
and no conjuncts to filters data.
+// in this way, we could eos sink as soon as possible, so could reduce sender 
total rows.
+bool VDataStreamRecvr::could_eos_sink() {

Review Comment:
   warning: method 'could_eos_sink' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   bool VDataStreamRecvr::could_eos_sink() const {
   ```
   
   be/src/vec/runtime/vdata_stream_recvr.h:117:
   ```diff
   -     bool could_eos_sink();
   +     bool could_eos_sink() const;
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to