This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 832338c55e [improvement] set name for scanner threads and fix compile
error in clang (#9336)
832338c55e is described below
commit 832338c55ee70a70b690a77a07e1f4115dd7a9c2
Author: dataroaring <[email protected]>
AuthorDate: Thu May 5 09:53:43 2022 +0800
[improvement] set name for scanner threads and fix compile error in clang
(#9336)
---
be/src/exec/base_scanner.h | 2 +-
be/src/exec/broker_scan_node.cpp | 4 +++-
be/src/exec/broker_scanner.h | 3 ++-
be/src/exec/olap_scan_node.cpp | 1 +
be/src/util/thread.cpp | 4 ++++
be/src/util/thread.h | 2 ++
be/src/vec/exec/vbroker_scan_node.cpp | 4 +++-
be/src/vec/exec/vbroker_scan_node.h | 7 ++++++-
be/src/vec/exec/vbroker_scanner.h | 9 +++++++--
9 files changed, 29 insertions(+), 7 deletions(-)
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 8b97a97a3f..13285ab6aa 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -63,7 +63,7 @@ public:
// Get next block
virtual Status get_next(std::vector<vectorized::MutableColumnPtr>&
columns, bool* eof) {
- return Status::NotSupported("Not Implemented get block");
+ return Status::NotSupported("Not Implemented get next");
}
// Close this scanner
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 4ae286e944..e1dbd255ec 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -31,6 +31,7 @@
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
+#include "util/thread.h"
namespace doris {
@@ -256,6 +257,7 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange&
scan_range,
const std::vector<ExprContext*>&
conjunct_ctxs,
ScannerCounter* counter) {
//create scanner object and open
+ Thread::set_self_name("broker_scanner");
std::unique_ptr<BaseScanner> scanner = create_scanner(scan_range, counter);
RETURN_IF_ERROR(scanner->open());
bool scanner_eof = false;
@@ -392,4 +394,4 @@ void BrokerScanNode::scanner_worker(int start_idx, int
length) {
Expr::close(scanner_expr_ctxs, _runtime_state);
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 7906b98ff4..344d3789ae 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -62,7 +62,8 @@ public:
Status open() override;
// Get next tuple
- Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool*
fill_tuple) override;
+ virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof,
+ bool* fill_tuple) override;
Status get_next(std::vector<vectorized::MutableColumnPtr>& columns, bool*
eof) override {
return Status::NotSupported("Not Implemented get columns");
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index e17e26bcd9..61befd7eda 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -1527,6 +1527,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
void OlapScanNode::scanner_thread(OlapScanner* scanner) {
SCOPED_ATTACH_TASK_THREAD(_runtime_state, mem_tracker());
ADD_THREAD_LOCAL_MEM_TRACKER(scanner->mem_tracker());
+ Thread::set_self_name("olap_scanner");
if (UNLIKELY(_transfer_done)) {
_scanner_done = true;
std::unique_lock<std::mutex> l(_scan_batches_lock);
diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp
index 64e7a50142..3be6d8b0e4 100644
--- a/be/src/util/thread.cpp
+++ b/be/src/util/thread.cpp
@@ -258,6 +258,10 @@ Thread::~Thread() {
}
}
+void Thread::set_self_name(const std::string& name) {
+ ThreadMgr::set_thread_name(name, current_thread_id());
+}
+
void Thread::join() {
ThreadJoiner(this).join();
}
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index dd2a96f4b5..0c4327a65d 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -88,6 +88,8 @@ public:
return start_thread(category, name, std::bind(f, a1, a2, a3, a4, a5,
a6), NO_FLAGS, holder);
}
+ static void set_self_name(const std::string& name);
+
~Thread();
// Blocks until this thread finishes execution. Once this method returns,
the thread
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp
b/be/src/vec/exec/vbroker_scan_node.cpp
index c1d597824c..d12f3e8d85 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -24,6 +24,7 @@
#include "runtime/tuple_row.h"
#include "runtime/mem_tracker.h"
#include "util/runtime_profile.h"
+#include "util/thread.h"
#include "util/types.h"
#include "vec/exprs/vexpr_context.h"
@@ -192,6 +193,7 @@ Status VBrokerScanNode::scanner_scan(const
TBrokerScanRange& scan_range, Scanner
}
void VBrokerScanNode::scanner_worker(int start_idx, int length) {
+ Thread::set_self_name("vbroker_scanner");
Status status = Status::OK();
ScannerCounter counter;
for (int i = 0; i < length && status.ok(); ++i) {
@@ -224,4 +226,4 @@ void VBrokerScanNode::scanner_worker(int start_idx, int
length) {
}
}
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vbroker_scan_node.h
b/be/src/vec/exec/vbroker_scan_node.h
index ee7f763815..9a3b2fe362 100644
--- a/be/src/vec/exec/vbroker_scan_node.h
+++ b/be/src/vec/exec/vbroker_scan_node.h
@@ -33,6 +33,11 @@ public:
VBrokerScanNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
~VBrokerScanNode() override = default;
+ // Fill the next row batch by calling next() on the scanner,
+ virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool*
eos) override {
+ return Status::NotSupported("Not Implemented
VBrokerScanNode::get_next.");
+ }
+
Status get_next(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
// Close the scanner, and report errors.
@@ -48,4 +53,4 @@ private:
std::deque<std::shared_ptr<vectorized::Block>> _block_queue;
};
} // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/exec/vbroker_scanner.h
b/be/src/vec/exec/vbroker_scanner.h
index 46128100ab..469e38d4e8 100644
--- a/be/src/vec/exec/vbroker_scanner.h
+++ b/be/src/vec/exec/vbroker_scanner.h
@@ -29,10 +29,15 @@ public:
const std::vector<TExpr>& pre_filter_texprs,
ScannerCounter* counter);
~VBrokerScanner() override = default;
- Status get_next(std::vector<MutableColumnPtr>& columns, bool* eof)
override;
+ virtual Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool*
eof,
+ bool* fill_tuple) override {
+ return Status::NotSupported("Not Implemented get next");
+ }
+
+ virtual Status get_next(std::vector<MutableColumnPtr>& columns, bool* eof)
override;
private:
Status _convert_one_row(const Slice& line, std::vector<MutableColumnPtr>&
columns);
Status _fill_dest_columns(std::vector<MutableColumnPtr>& columns);
};
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]