This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 832338c55e [improvement] set name for scanner threads and fix compile 
error in clang (#9336)
832338c55e is described below

commit 832338c55ee70a70b690a77a07e1f4115dd7a9c2
Author: dataroaring <[email protected]>
AuthorDate: Thu May 5 09:53:43 2022 +0800

    [improvement] set name for scanner threads and fix compile error in clang 
(#9336)
---
 be/src/exec/base_scanner.h            | 2 +-
 be/src/exec/broker_scan_node.cpp      | 4 +++-
 be/src/exec/broker_scanner.h          | 3 ++-
 be/src/exec/olap_scan_node.cpp        | 1 +
 be/src/util/thread.cpp                | 4 ++++
 be/src/util/thread.h                  | 2 ++
 be/src/vec/exec/vbroker_scan_node.cpp | 4 +++-
 be/src/vec/exec/vbroker_scan_node.h   | 7 ++++++-
 be/src/vec/exec/vbroker_scanner.h     | 9 +++++++--
 9 files changed, 29 insertions(+), 7 deletions(-)

diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 8b97a97a3f..13285ab6aa 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -63,7 +63,7 @@ public:
 
     // Get next block
     virtual Status get_next(std::vector<vectorized::MutableColumnPtr>& 
columns, bool* eof) {
-        return Status::NotSupported("Not Implemented get block");
+        return Status::NotSupported("Not Implemented get next");
     }
 
     // Close this scanner
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 4ae286e944..e1dbd255ec 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -31,6 +31,7 @@
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
+#include "util/thread.h"
 
 namespace doris {
 
@@ -256,6 +257,7 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& 
scan_range,
                                     const std::vector<ExprContext*>& 
conjunct_ctxs,
                                     ScannerCounter* counter) {
     //create scanner object and open
+    Thread::set_self_name("broker_scanner");
     std::unique_ptr<BaseScanner> scanner = create_scanner(scan_range, counter);
     RETURN_IF_ERROR(scanner->open());
     bool scanner_eof = false;
@@ -392,4 +394,4 @@ void BrokerScanNode::scanner_worker(int start_idx, int 
length) {
     Expr::close(scanner_expr_ctxs, _runtime_state);
 }
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 7906b98ff4..344d3789ae 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -62,7 +62,8 @@ public:
     Status open() override;
 
     // Get next tuple
-    Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* 
fill_tuple) override;
+    virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof,
+                            bool* fill_tuple) override;
 
     Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool* 
eof) override {
         return Status::NotSupported("Not Implemented get columns");
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index e17e26bcd9..61befd7eda 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1527,6 +1527,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
 void OlapScanNode::scanner_thread(OlapScanner* scanner) {
     SCOPED_ATTACH_TASK_THREAD(_runtime_state, mem_tracker());
     ADD_THREAD_LOCAL_MEM_TRACKER(scanner->mem_tracker());
+    Thread::set_self_name("olap_scanner");
     if (UNLIKELY(_transfer_done)) {
         _scanner_done = true;
         std::unique_lock<std::mutex> l(_scan_batches_lock);
diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp
index 64e7a50142..3be6d8b0e4 100644
--- a/be/src/util/thread.cpp
+++ b/be/src/util/thread.cpp
@@ -258,6 +258,10 @@ Thread::~Thread() {
     }
 }
 
+void Thread::set_self_name(const std::string& name) {
+    ThreadMgr::set_thread_name(name, current_thread_id());
+}
+
 void Thread::join() {
     ThreadJoiner(this).join();
 }
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index dd2a96f4b5..0c4327a65d 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -88,6 +88,8 @@ public:
         return start_thread(category, name, std::bind(f, a1, a2, a3, a4, a5, 
a6), NO_FLAGS, holder);
     }
 
+    static void set_self_name(const std::string& name);
+
     ~Thread();
 
     // Blocks until this thread finishes execution. Once this method returns, 
the thread
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp 
b/be/src/vec/exec/vbroker_scan_node.cpp
index c1d597824c..d12f3e8d85 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -24,6 +24,7 @@
 #include "runtime/tuple_row.h"
 #include "runtime/mem_tracker.h"
 #include "util/runtime_profile.h"
+#include "util/thread.h"
 #include "util/types.h"
 #include "vec/exprs/vexpr_context.h"
 
@@ -192,6 +193,7 @@ Status VBrokerScanNode::scanner_scan(const 
TBrokerScanRange& scan_range, Scanner
 }
 
 void VBrokerScanNode::scanner_worker(int start_idx, int length) {
+    Thread::set_self_name("vbroker_scanner");
     Status status = Status::OK();
     ScannerCounter counter;
     for (int i = 0; i < length && status.ok(); ++i) {
@@ -224,4 +226,4 @@ void VBrokerScanNode::scanner_worker(int start_idx, int 
length) {
     }
 }
 
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vbroker_scan_node.h 
b/be/src/vec/exec/vbroker_scan_node.h
index ee7f763815..9a3b2fe362 100644
--- a/be/src/vec/exec/vbroker_scan_node.h
+++ b/be/src/vec/exec/vbroker_scan_node.h
@@ -33,6 +33,11 @@ public:
     VBrokerScanNode(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
     ~VBrokerScanNode() override = default;
 
+    // Fill the next row batch by calling next() on the scanner,
+    virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* 
eos) override {
+        return Status::NotSupported("Not Implemented 
VBrokerScanNode::get_next.");
+    }
+
     Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
     // Close the scanner, and report errors.
@@ -48,4 +53,4 @@ private:
     std::deque<std::shared_ptr<vectorized::Block>> _block_queue;
 };
 } // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/exec/vbroker_scanner.h 
b/be/src/vec/exec/vbroker_scanner.h
index 46128100ab..469e38d4e8 100644
--- a/be/src/vec/exec/vbroker_scanner.h
+++ b/be/src/vec/exec/vbroker_scanner.h
@@ -29,10 +29,15 @@ public:
                    const std::vector<TExpr>& pre_filter_texprs, 
ScannerCounter* counter);
     ~VBrokerScanner() override = default;
 
-    Status get_next(std::vector<MutableColumnPtr>& columns, bool* eof) 
override;
+    virtual Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool* 
eof,
+                            bool* fill_tuple) override {
+        return Status::NotSupported("Not Implemented get next");
+    }
+
+    virtual Status get_next(std::vector<MutableColumnPtr>& columns, bool* eof) 
override;
 
 private:
     Status _convert_one_row(const Slice& line, std::vector<MutableColumnPtr>& 
columns);
     Status _fill_dest_columns(std::vector<MutableColumnPtr>& columns);
 };
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to