zclllyybb commented on code in PR #48720:
URL: https://github.com/apache/doris/pull/48720#discussion_r2681123177


##########
be/src/pipeline/shuffle/writer.cpp:
##########
@@ -17,100 +17,235 @@
 
 #include "writer.h"
 
-#include <type_traits>
+#include <glog/logging.h>
 
+#include <algorithm>
+#include <cstdint>
+
+#include "common/logging.h"
+#include "common/status.h"
 #include "pipeline/exec/exchange_sink_operator.h"
 #include "vec/core/block.h"
+#include "vec/sink/tablet_sink_hash_partitioner.h"
 
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
+
 template <typename ChannelPtrType>
-Status Writer::_handle_eof_channel(RuntimeState* state, ChannelPtrType 
channel, Status st) const {
+Status WriterBase::_handle_eof_channel(RuntimeState* state, ChannelPtrType 
channel,
+                                       Status st) const {
     channel->set_receiver_eof(st);
-    // Chanel will not send RPC to the downstream when eof, so close chanel by 
OK status.
+    // Chanel will not send RPC to the downstream when eof, so close channel 
by OK status.
     return channel->close(state);
 }
 
-Status Writer::write(ExchangeSinkLocalState* local_state, RuntimeState* state,
-                     vectorized::Block* block, bool eos) {
-    bool already_sent = false;
+// NOLINTBEGIN(readability-function-cognitive-complexity)
+Status WriterBase::_add_rows_impl(RuntimeState* state,
+                                  
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
+                                  size_t channel_count, vectorized::Block* 
block, bool eos) {
+    Status status = Status::OK();
+    uint32_t offset = 0;
+    for (size_t i = 0; i < channel_count; ++i) {
+        uint32_t size = _channel_rows_histogram[i];
+        if (!channels[i]->is_receiver_eof() && size > 0) {
+            VLOG_DEBUG << fmt::format("partition {} of {}, block:\n{}, start: 
{}, size: {}", i,
+                                      channel_count, block->dump_data(), 
offset, size);
+            status = channels[i]->add_rows(block, _origin_row_idx.data(), 
offset, size, false);
+            HANDLE_CHANNEL_STATUS(state, channels[i], status);
+        }
+        offset += size;
+    }
+    if (eos) {
+        for (int i = 0; i < channel_count; ++i) {
+            if (!channels[i]->is_receiver_eof()) {
+                VLOG_DEBUG << fmt::format("EOS partition {} of {}, 
block:\n{}", i, channel_count,
+                                          block->dump_data());
+                status = channels[i]->add_rows(block, _origin_row_idx.data(), 
0, 0, true);
+                HANDLE_CHANNEL_STATUS(state, channels[i], status);
+            }
+        }
+    }
+    return Status::OK();
+}
+// NOLINTEND(readability-function-cognitive-complexity)
+
+Status OlapWriter::write(ExchangeSinkLocalState* local_state, RuntimeState* 
state,
+                         vectorized::Block* block, bool eos) {
+    Status st = _write_normal(local_state, state, block);
+    // auto partition's batched block cut in line. send this unprocessed block 
again.
+    if (st.is<ErrorCode::NEED_SEND_AGAIN>()) {
+        RETURN_IF_ERROR(_write_normal(local_state, state, block));
+    } else if (!st.ok()) {
+        return st;
+    }
+    // the block is already processed normally. in `_write_last` we only need 
to consider batched rows.
+    if (eos) {
+        vectorized::Block empty_block = block->clone_empty();
+        RETURN_IF_ERROR(_write_last(local_state, state, &empty_block));

Review Comment:
   try_cut_in_line拿到外面,去掉_write_last



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to