This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 50f66b1246 [fix](pipeline) fix bug of datastream sender when doing 
BUCKET_SHFFULE_HASH_PARTITIONED shuffle (#22988)
50f66b1246 is described below

commit 50f66b12462bcf693259a43594f253a5c1ca3626
Author: TengJianPing <[email protected]>
AuthorDate: Tue Aug 15 17:30:27 2023 +0800

    [fix](pipeline) fix bug of datastream sender when doing 
BUCKET_SHFFULE_HASH_PARTITIONED shuffle (#22988)
    
    This issue is introduced by #22765, if #22765 is picked to 2.0, then also 
need to pick this PR.
    
    When shuffle type is BUCKET_SHFFULE_HASH_PARTITIONED, since data of multi 
buckets maybe sent to the same channel, send eos too early may cause data lost.
---
 be/src/vec/sink/vdata_stream_sender.h | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 474e1fba45..9406d7524c 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -408,9 +408,18 @@ Status VDataStreamSender::channel_add_rows(RuntimeState* 
state, Channels& channe
 
     Status status;
     for (int i = 0; i < num_channels; ++i) {
-        if (!channels[i]->is_receiver_eof() && (!channel2rows[i].empty() || 
eos)) {
-            status = channels[i]->add_rows(block, channel2rows[i], eos);
+        if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) {
+            status = channels[i]->add_rows(block, channel2rows[i], false);
             HANDLE_CHANNEL_STATUS(state, channels[i], status);
+            channel2rows[i].clear();
+        }
+    }
+    if (eos) {
+        for (int i = 0; i < num_channels; ++i) {
+            if (!channels[i]->is_receiver_eof()) {
+                status = channels[i]->add_rows(block, channel2rows[i], true);
+                HANDLE_CHANNEL_STATUS(state, channels[i], status);
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to