Gabriel39 commented on code in PR #9968:
URL: https://github.com/apache/incubator-doris/pull/9968#discussion_r890105271


##########
be/src/vec/exec/vbroker_scan_node.h:
##########
@@ -43,15 +54,64 @@ class VBrokerScanNode final : public BrokerScanNode {
     // Close the scanner, and report errors.
     Status close(RuntimeState* state) override;
 
+    // No use
+    Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+
 private:
-    Status start_scanners() override;
+    // Write debug string of this into out.
+    void debug_string(int indentation_level, std::stringstream* out) const 
override;
+
+    // Update process status to one failed status,
+    // NOTE: Must hold the mutex of this scan node
+    bool update_status(const Status& new_status) {
+        if (process_status_.ok()) {
+            process_status_ = new_status;
+            return true;
+        }
+        return false;
+    }
+
+    std::unique_ptr<BaseScanner> create_scanner(const TBrokerScanRange& 
scan_range,
+                                                ScannerCounter* counter);
+
+    Status start_scanners();
 
     void scanner_worker(int start_idx, int length);
     // Scan one range
     Status scanner_scan(const TBrokerScanRange& scan_range, ScannerCounter* 
counter);
 
-    std::deque<std::shared_ptr<vectorized::Block>> _block_queue;
-    std::unique_ptr<MutableBlock> _mutable_block;
+    TupleId tuple_id_;

Review Comment:
   This node inherits from scan node and is independent from BrokerScanNode



##########
be/src/vec/exec/vbroker_scan_node.cpp:
##########
@@ -26,107 +26,168 @@
 #include "util/runtime_profile.h"
 #include "util/thread.h"
 #include "util/types.h"
+#include "vec/exec/vbroker_scanner.h"
+#include "vec/exec/vjson_scanner.h"
+#include "vec/exec/vorc_scanner.h"
+#include "vec/exec/vparquet_scanner.h"
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris::vectorized {
 
 VBrokerScanNode::VBrokerScanNode(ObjectPool* pool, const TPlanNode& tnode,
                                  const DescriptorTbl& descs)
-        : BrokerScanNode(pool, tnode, descs) {
-    _vectorized = true;
+        : ScanNode(pool, tnode, descs),
+          tuple_id_(tnode.broker_scan_node.tuple_id),
+          runtime_state_(nullptr),
+          tuple_desc_(nullptr),
+          num_running_scanners_(0),
+          scan_finished_(false),
+          max_buffered_batches_(32),
+          wait_scanner_timer_(nullptr) {}
+
+Status VBrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
+    RETURN_IF_ERROR(ScanNode::init(tnode, state));
+    auto& broker_scan_node = tnode.broker_scan_node;
+
+    if (broker_scan_node.__isset.pre_filter_exprs) {
+        pre_filter_texprs_ = broker_scan_node.pre_filter_exprs;
+    }
+
+    return Status::OK();
+}
+
+Status VBrokerScanNode::prepare(RuntimeState* state) {
+    VLOG_QUERY << "BrokerScanNode prepare";
+    RETURN_IF_ERROR(ScanNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    // get tuple desc
+    runtime_state_ = state;
+    tuple_desc_ = state->desc_tbl().get_tuple_descriptor(tuple_id_);
+    if (tuple_desc_ == nullptr) {
+        std::stringstream ss;
+        ss << "Failed to get tuple descriptor, tuple_id_=" << tuple_id_;
+        return Status::InternalError(ss.str());
+    }
+
+    // Initialize slots map
+    for (auto slot : tuple_desc_->slots()) {
+        auto pair = slots_map_.emplace(slot->col_name(), slot);
+        if (!pair.second) {
+            std::stringstream ss;
+            ss << "Failed to insert slot, col_name=" << slot->col_name();
+            return Status::InternalError(ss.str());
+        }
+    }
+
+    // Profile
+    wait_scanner_timer_ = ADD_TIMER(runtime_profile(), "WaitScannerTime");
+
+    return Status::OK();
+}
+
+Status VBrokerScanNode::open(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    RETURN_IF_ERROR(ExecNode::open(state));
+    RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
+    RETURN_IF_CANCELLED(state);
+
+    RETURN_IF_ERROR(start_scanners());
+
+    return Status::OK();
 }
 
 Status VBrokerScanNode::start_scanners() {
     {
-        std::unique_lock<std::mutex> l(_batch_queue_lock);
-        _num_running_scanners = 1;
+        std::unique_lock<std::mutex> l(batch_queue_lock_);

Review Comment:
   I think we have discussed this convention in WeChat group. I don't plan to 
modify all variables now but it's better to use this convention in future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to