HappenLee commented on code in PR #17187:
URL: https://github.com/apache/doris/pull/17187#discussion_r1133374909


##########
be/src/vec/exec/scan/pip_scanner_context.h:
##########
@@ -33,20 +33,68 @@ class PipScannerContext : public vectorized::ScannerContext 
{
             : vectorized::ScannerContext(state, parent, input_tuple_desc, 
output_tuple_desc,
                                          scanners, limit, 
max_bytes_in_blocks_queue) {}
 
-    void _update_block_queue_empty() override { _blocks_queue_empty = 
_blocks_queue.empty(); }
+    Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* 
block, bool* eos,
+                                int id, bool wait = false) override {
+        {
+            std::unique_lock<std::mutex> l(_transfer_lock);
+            if (state->is_cancelled()) {
+                _process_status = Status::Cancelled("cancelled");
+            }
 
-    Status get_block_from_queue(vectorized::BlockUPtr* block, bool* eos,
-                                bool wait = false) override {
-        return vectorized::ScannerContext::get_block_from_queue(block, eos, 
false);
+            if (!_process_status.ok()) {
+                return _process_status;
+            }
+        }
+
+        {
+            std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
+            if (!_blocks_queues[id].empty()) {
+                *block = std::move(_blocks_queues[id].front());
+                _blocks_queues[id].pop_front();
+                return Status::OK();
+            } else {
+                *eos = _is_finished || _should_stop;
+            }
+        }
+        return Status::OK();
     }
 
     // We should make those method lock free.
     bool done() override { return _is_finished || _should_stop || 
_status_error; }
-    bool no_schedule() override { return _num_running_scanners == 0 && 
_num_scheduling_ctx == 0; }
-    bool empty_in_queue() override { return _blocks_queue_empty; }
+
+    void append_blocks_to_queue(std::vector<vectorized::BlockUPtr>& blocks) 
override {
+        const int queue_size = _queue_mutexs.size();
+        const int block_size = blocks.size();
+        for (int i = 0; i < queue_size && i < block_size; ++i) {
+            int queue = _next_queue_to_feed;
+            {
+                std::lock_guard<std::mutex> l(*_queue_mutexs[queue]);
+                for (int j = i; j < block_size; j += queue_size) {
+                    _blocks_queues[queue].emplace_back(std::move(blocks[j]));
+                }
+            }
+            _next_queue_to_feed = queue + 1 < queue_size ? queue + 1 : 0;
+        }
+    }
+
+    bool empty_in_queue(int id) override {
+        std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
+        return _blocks_queues[id].empty();
+    }
+
+    void set_max_queue_size(int max_queue_size) override {
+        for (int i = 0; i < max_queue_size; ++i) {
+            _blocks_queue_empty.emplace_back(true);
+            _queue_mutexs.emplace_back(new std::mutex);
+            _blocks_queues.emplace_back(std::list<vectorized::BlockUPtr>());
+        }
+    }
 
 private:
-    std::atomic_bool _blocks_queue_empty = true;
+    int _next_queue_to_feed = 0;
+    std::vector<bool> _blocks_queue_empty;

Review Comment:
   will remove next pr



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to