This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new be9385d40a [improvement](lock raii) use raii to lock and unlock
(#16652)
be9385d40a is described below
commit be9385d40a653a9220dbc42fdc43c4b054aa7f08
Author: yiguolei <[email protected]>
AuthorDate: Mon Feb 13 14:06:36 2023 +0800
[improvement](lock raii) use raii to lock and unlock (#16652)
* [improvement](lock raii) use raii to lock and unlock
This is part of exception safe: #16366.
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/olap/task/engine_storage_migration_task.cpp | 16 +-
be/src/util/blocking_queue.hpp | 31 ---
be/src/util/fake_lock.h | 40 ---
be/src/util/internal_queue.h | 283 -------------------
be/src/vec/exec/scan/vscanner.h | 3 -
be/src/vec/json/parse2column.cpp | 1 -
be/src/vec/runtime/vdata_stream_mgr.cpp | 10 +-
be/test/CMakeLists.txt | 1 -
be/test/util/internal_queue_test.cpp | 310 ---------------------
9 files changed, 11 insertions(+), 684 deletions(-)
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp
b/be/src/olap/task/engine_storage_migration_task.cpp
index 6ecefe4722..e0d7ca4378 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -91,15 +91,15 @@ Status
EngineStorageMigrationTask::_check_running_txns_until_timeout(
int try_times = 1;
do {
// to avoid invalid loops, the lock is guaranteed to be acquired here
- std::unique_lock<std::shared_mutex>
wlock(_tablet->get_migration_lock());
- res = _check_running_txns();
- if (res.ok()) {
- // transfer the lock to the caller
- *migration_wlock = std::move(wlock);
- return res;
+ {
+ std::unique_lock<std::shared_mutex>
wlock(_tablet->get_migration_lock());
+ res = _check_running_txns();
+ if (res.ok()) {
+ // transfer the lock to the caller
+ *migration_wlock = std::move(wlock);
+ return res;
+ }
}
- // unlock and sleep for a while, try again
- wlock.unlock();
sleep(std::min(config::sleep_one_second * try_times,
CHECK_TXNS_MAX_WAIT_TIME_SECS));
++try_times;
} while (!_is_timeout());
diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp
index dff6911edf..35cd74ee89 100644
--- a/be/src/util/blocking_queue.hpp
+++ b/be/src/util/blocking_queue.hpp
@@ -64,37 +64,6 @@ public:
}
}
- /// Puts an element into the queue, waiting until 'timeout_micros'
elapses, if there is
- /// no space. If the queue is shut down, or if the timeout elapsed without
being able to
- /// put the element, returns false.
- /*
- bool blocking_put_with_timeout(const T& val, int64_t timeout_micros) {
- MonotonicStopWatch timer;
- std::unique_lock<std::mutex> write_lock(_lock);
- std::system_time wtime = std::get_system_time() +
- std::posix_time::microseconds(timeout_micros);
- const struct timespec timeout = std::detail::to_timespec(wtime);
- bool notified = true;
- while (SizeLocked(write_lock) >= _max_elements && !_shutdown &&
notified) {
- timer.Start();
- // Wait until we're notified or until the timeout expires.
- notified = _put_cv.TimedWait(write_lock, &timeout);
- timer.Stop();
- }
- _total_put_wait_time += timer.ElapsedTime();
- // If the list is still full or if the the queue has been shut down,
return false.
- // NOTE: We don't check 'notified' here as it appears that pthread
condition variables
- // have a weird behavior in which they can return ETIMEDOUT from
timed_wait even if
- // another thread did in fact signal
- if (SizeLocked(write_lock) >= _max_elements || _shutdown) return false;
- DCHECK_LT(put_list_.size(), _max_elements);
- _list.push_back(val);
- write_lock.unlock();
- _get_cv.NotifyOne();
- return true;
- }
- */
-
// Puts an element into the queue, waiting indefinitely until there is
space.
// If the queue is shut down, returns false.
bool blocking_put(const T& val) {
diff --git a/be/src/util/fake_lock.h b/be/src/util/fake_lock.h
deleted file mode 100644
index a706371298..0000000000
--- a/be/src/util/fake_lock.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/fake-lock.h
-// and modified by Doris
-
-#pragma once
-
-#include "gutil/macros.h"
-
-namespace doris {
-
-// Implementation of Boost's lockable interface that does nothing. Used to
replace an
-// actual lock implementation in template classes in if no thread safety is
needed.
-class FakeLock {
-public:
- FakeLock() {}
- void lock() {}
- void unlock() {}
- bool try_lock() { return true; }
-
-private:
- DISALLOW_COPY_AND_ASSIGN(FakeLock);
-};
-
-} // namespace doris
diff --git a/be/src/util/internal_queue.h b/be/src/util/internal_queue.h
deleted file mode 100644
index a20944893a..0000000000
--- a/be/src/util/internal_queue.h
+++ /dev/null
@@ -1,283 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-//
https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/internal-queue.h
-// and modified by Doris
-
-#pragma once
-
-#include <functional>
-#include <mutex>
-
-#include "common/logging.h"
-#include "util/fake_lock.h"
-#include "util/spinlock.h"
-
-namespace doris {
-
-/// FIFO queue implemented as a doubly-linked lists with internal pointers.
This is in
-/// contrast to the STL list which allocates a wrapper Node object around the
data. Since
-/// it's an internal queue, the list pointers are maintained in the Nodes
which is memory
-/// owned by the user. The nodes cannot be deallocated while the queue has
elements.
-/// The internal structure is a doubly-linked list.
-/// nullptr <-- N1 <--> N2 <--> N3 --> nullptr
-/// (head) (tail)
-///
-/// InternalQueue<T> instantiates a thread-safe queue where the queue is
protected by an
-/// internal Spinlock. InternalList<T> instantiates a list with no thread
safety.
-///
-/// To use these data structures, the element to be added to the queue or list
must
-/// subclass ::Node.
-///
-/// TODO: this is an ideal candidate to be made lock free.
-
-/// T must be a subclass of InternalQueueBase::Node.
-template <typename LockType, typename T>
-class InternalQueueBase {
-public:
- struct Node {
- public:
- Node() : parent_queue(nullptr), next_node(nullptr), prev_node(nullptr)
{}
- virtual ~Node() {}
-
- /// Returns true if the node is in a queue.
- bool in_queue() const { return parent_queue != nullptr; }
-
- /// Returns the Next/Prev node or nullptr if this is the end/front.
- T* next() const {
- std::lock_guard<LockType> lock(parent_queue->lock_);
- return reinterpret_cast<T*>(next_node);
- }
- T* prev() const {
- std::lock_guard<LockType> lock(parent_queue->lock_);
- return reinterpret_cast<T*>(prev_node);
- }
-
- private:
- friend class InternalQueueBase<LockType, T>;
-
- /// Pointer to the queue this Node is on. nullptr if not on any queue.
- InternalQueueBase<LockType, T>* parent_queue;
- Node* next_node;
- Node* prev_node;
- };
-
- InternalQueueBase() : head_(nullptr), tail_(nullptr), size_(0) {}
-
- /// Returns the element at the head of the list without dequeuing or
nullptr
- /// if the queue is empty. This is O(1).
- T* head() const {
- std::lock_guard<LockType> lock(lock_);
- if (empty()) return nullptr;
- return reinterpret_cast<T*>(head_);
- }
-
- /// Returns the element at the end of the list without dequeuing or nullptr
- /// if the queue is empty. This is O(1).
- T* tail() {
- std::lock_guard<LockType> lock(lock_);
- if (empty()) return nullptr;
- return reinterpret_cast<T*>(tail_);
- }
-
- /// Enqueue node onto the queue's tail. This is O(1).
- void enqueue(T* n) {
- Node* node = (Node*)n;
- DCHECK(node->next_node == nullptr);
- DCHECK(node->prev_node == nullptr);
- DCHECK(node->parent_queue == nullptr);
- node->parent_queue = this;
- {
- std::lock_guard<LockType> lock(lock_);
- if (tail_ != nullptr) tail_->next_node = node;
- node->prev_node = tail_;
- tail_ = node;
- if (head_ == nullptr) head_ = node;
- ++size_;
- }
- }
-
- /// Dequeues an element from the queue's head. Returns nullptr if the queue
- /// is empty. This is O(1).
- T* dequeue() {
- Node* result = nullptr;
- {
- std::lock_guard<LockType> lock(lock_);
- if (empty()) return nullptr;
- --size_;
- result = head_;
- head_ = head_->next_node;
- if (head_ == nullptr) {
- tail_ = nullptr;
- } else {
- head_->prev_node = nullptr;
- }
- }
- DCHECK(result != nullptr);
- result->next_node = result->prev_node = nullptr;
- result->parent_queue = nullptr;
- return reinterpret_cast<T*>(result);
- }
-
- /// Dequeues an element from the queue's tail. Returns nullptr if the queue
- /// is empty. This is O(1).
- T* pop_back() {
- Node* result = nullptr;
- {
- std::lock_guard<LockType> lock(lock_);
- if (empty()) return nullptr;
- --size_;
- result = tail_;
- tail_ = tail_->prev_node;
- if (tail_ == nullptr) {
- head_ = nullptr;
- } else {
- tail_->next_node = nullptr;
- }
- }
- DCHECK(result != nullptr);
- result->next_node = result->prev_node = nullptr;
- result->parent_queue = nullptr;
- return reinterpret_cast<T*>(result);
- }
-
- /// Removes 'node' from the queue. This is O(1). No-op if node is
- /// not on the list. Returns true if removed
- bool remove(T* n) {
- Node* node = (Node*)n;
- if (node->parent_queue != this) return false;
- {
- std::lock_guard<LockType> lock(lock_);
- if (node->next_node == nullptr && node->prev_node == nullptr) {
- // Removing only node
- DCHECK(node == head_);
- DCHECK(tail_ == node);
- head_ = tail_ = nullptr;
- --size_;
- node->parent_queue = nullptr;
- return true;
- }
-
- if (head_ == node) {
- DCHECK(node->prev_node == nullptr);
- head_ = node->next_node;
- } else {
- DCHECK(node->prev_node != nullptr);
- node->prev_node->next_node = node->next_node;
- }
-
- if (node == tail_) {
- DCHECK(node->next_node == nullptr);
- tail_ = node->prev_node;
- } else if (node->next_node != nullptr) {
- node->next_node->prev_node = node->prev_node;
- }
- --size_;
- }
- node->next_node = node->prev_node = nullptr;
- node->parent_queue = nullptr;
- return true;
- }
-
- /// Clears all elements in the list.
- void clear() {
- std::lock_guard<LockType> lock(lock_);
- Node* cur = head_;
- while (cur != nullptr) {
- Node* tmp = cur;
- cur = cur->next_node;
- tmp->prev_node = tmp->next_node = nullptr;
- tmp->parent_queue = nullptr;
- }
- size_ = 0;
- head_ = tail_ = nullptr;
- }
-
- int size() const { return size_; }
- bool empty() const { return head_ == nullptr; }
-
- /// Returns if the target is on the queue. This is O(1) and does not
acquire any locks.
- bool contains(const T* target) const { return target->parent_queue ==
this; }
-
- /// Validates the internal structure of the list
- bool validate() {
- int num_elements_found = 0;
- std::lock_guard<LockType> lock(lock_);
- if (head_ == nullptr) {
- if (tail_ != nullptr) return false;
- if (size() != 0) return false;
- return true;
- }
-
- if (head_->prev_node != nullptr) return false;
- Node* current = head_;
- while (current != nullptr) {
- if (current->parent_queue != this) return false;
- ++num_elements_found;
- Node* next_node = current->next_node;
- if (next_node == nullptr) {
- if (current != tail_) return false;
- } else {
- if (next_node->prev_node != current) return false;
- }
- current = next_node;
- }
- if (num_elements_found != size()) return false;
- return true;
- }
-
- // Iterate over elements of queue, calling 'fn' for each element. If 'fn'
returns
- // false, terminate iteration. It is invalid to call other InternalQueue
methods
- // from 'fn'.
- void iterate(std::function<bool(T*)> fn) {
- std::lock_guard<LockType> lock(lock_);
- for (Node* current = head_; current != nullptr; current =
current->next_node) {
- if (!fn(reinterpret_cast<T*>(current))) return;
- }
- }
-
- /// Prints the queue ptrs to a string.
- std::string DebugString() {
- std::stringstream ss;
- ss << "(";
- {
- std::lock_guard<LockType> lock(lock_);
- Node* curr = head_;
- while (curr != nullptr) {
- ss << (void*)curr;
- curr = curr->next_node;
- }
- }
- ss << ")";
- return ss.str();
- }
-
-private:
- friend struct Node;
- mutable LockType lock_;
- Node *head_, *tail_;
- int size_;
-};
-
-// The default LockType is SpinLock.
-template <typename T>
-class InternalQueue : public InternalQueueBase<SpinLock, T> {};
-
-// InternalList is a non-threadsafe implementation.
-template <typename T>
-class InternalList : public InternalQueueBase<FakeLock, T> {};
-} // namespace doris
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 8c01727ba8..a909e91ee3 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -138,9 +138,6 @@ protected:
// If _input_tuple_desc is set, the scanner will read data into
// this _input_block first, then convert to the output block.
Block _input_block;
- // If _input_tuple_desc is set, this will point to _input_block,
- // otherwise, it will point to the output block.
- Block* _input_block_ptr;
bool _is_open = false;
bool _is_closed = false;
diff --git a/be/src/vec/json/parse2column.cpp b/be/src/vec/json/parse2column.cpp
index 6a6474c8ad..3970bb0156 100644
--- a/be/src/vec/json/parse2column.cpp
+++ b/be/src/vec/json/parse2column.cpp
@@ -63,7 +63,6 @@ public:
Pointer get(Factory&& f) {
std::unique_lock lock(mutex);
if (stack.empty()) {
- lock.unlock();
return {f(), this};
}
auto object = stack.top().release();
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index b89935b1c0..282d641431 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -67,8 +67,10 @@ std::shared_ptr<VDataStreamRecvr>
VDataStreamMgr::find_recvr(const TUniqueId& fr
bool
acquire_lock) {
VLOG_ROW << "looking up fragment_instance_id=" << fragment_instance_id <<
", node=" << node_id;
size_t hash_value = get_hash_value(fragment_instance_id, node_id);
+ // Create lock guard and not own lock currently and will lock conditionally
+ std::unique_lock recvr_lock(_lock, std::defer_lock);
if (acquire_lock) {
- _lock.lock();
+ recvr_lock.lock();
}
std::pair<StreamMap::iterator, StreamMap::iterator> range =
_receiver_map.equal_range(hash_value);
@@ -76,16 +78,10 @@ std::shared_ptr<VDataStreamRecvr>
VDataStreamMgr::find_recvr(const TUniqueId& fr
auto recvr = range.first->second;
if (recvr->fragment_instance_id() == fragment_instance_id &&
recvr->dest_node_id() == node_id) {
- if (acquire_lock) {
- _lock.unlock();
- }
return recvr;
}
++range.first;
}
- if (acquire_lock) {
- _lock.unlock();
- }
return std::shared_ptr<VDataStreamRecvr>();
}
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 7fa7493bf6..2473ac272d 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -176,7 +176,6 @@ set(UTIL_TEST_FILES
util/crc32c_test.cpp
util/lru_cache_util_test.cpp
util/filesystem_util_test.cpp
- util/internal_queue_test.cpp
util/cidr_test.cpp
util/metrics_test.cpp
util/doris_metrics_test.cpp
diff --git a/be/test/util/internal_queue_test.cpp
b/be/test/util/internal_queue_test.cpp
deleted file mode 100644
index fe11c9059e..0000000000
--- a/be/test/util/internal_queue_test.cpp
+++ /dev/null
@@ -1,310 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "util/internal_queue.h"
-
-#include <gtest/gtest.h>
-#include <unistd.h>
-
-#include <mutex>
-#include <thread>
-
-#include "common/configbase.h"
-#include "testutil/test_util.h"
-#include "util/thread_group.h"
-
-using std::vector;
-using std::thread;
-
-namespace doris {
-
-struct IntNode : public InternalQueue<IntNode>::Node {
- IntNode() : value() {}
- IntNode(int value) : value(value) {}
- int value;
-};
-
-// Basic single threaded operation.
-TEST(InternalQueue, TestBasic) {
- IntNode one(1);
- IntNode two(2);
- IntNode three(3);
- IntNode four(4);
-
- InternalQueue<IntNode> list;
- EXPECT_TRUE(list.empty());
- EXPECT_EQ(list.size(), 0);
- EXPECT_TRUE(list.dequeue() == nullptr);
- EXPECT_TRUE(list.validate());
-
- list.enqueue(&one);
- EXPECT_TRUE(!list.empty());
- EXPECT_EQ(list.size(), 1);
- IntNode* i = list.dequeue();
- EXPECT_TRUE(i != nullptr);
- EXPECT_TRUE(list.empty());
- EXPECT_EQ(list.size(), 0);
- EXPECT_EQ(i->value, 1);
- EXPECT_TRUE(list.validate());
-
- list.enqueue(&one);
- list.enqueue(&two);
- list.enqueue(&three);
- list.enqueue(&four);
- EXPECT_EQ(list.size(), 4);
- EXPECT_TRUE(list.validate());
-
- i = list.dequeue();
- EXPECT_TRUE(i != nullptr);
- EXPECT_EQ(i->value, 1);
- EXPECT_TRUE(list.validate());
-
- i = list.dequeue();
- EXPECT_TRUE(i != nullptr);
- EXPECT_EQ(i->value, 2);
- EXPECT_TRUE(list.validate());
-
- i = list.dequeue();
- EXPECT_TRUE(i != nullptr);
- EXPECT_EQ(i->value, 3);
- EXPECT_TRUE(list.validate());
-
- i = list.dequeue();
- EXPECT_TRUE(i != nullptr);
- EXPECT_EQ(i->value, 4);
- EXPECT_TRUE(list.validate());
-
- list.enqueue(&one);
- list.enqueue(&two);
- list.enqueue(&three);
- list.enqueue(&four);
-
- IntNode* node = list.head();
- int val = 1;
- while (node != nullptr) {
- EXPECT_EQ(node->value, val);
- node = node->next();
- ++val;
- }
-
- node = list.tail();
- val = 4;
- while (node != nullptr) {
- EXPECT_EQ(node->value, val);
- node = node->prev();
- --val;
- }
-
- for (int i = 0; i < 4; ++i) {
- node = list.pop_back();
- EXPECT_TRUE(node != nullptr);
- EXPECT_EQ(node->value, 4 - i);
- EXPECT_TRUE(list.validate());
- }
- EXPECT_TRUE(list.pop_back() == nullptr);
- EXPECT_EQ(list.size(), 0);
- EXPECT_TRUE(list.empty());
-}
-
-// Add all the nodes and then remove every other one.
-TEST(InternalQueue, TestRemove) {
- std::vector<IntNode> nodes;
- nodes.resize(100);
-
- InternalQueue<IntNode> queue;
-
- queue.enqueue(&nodes[0]);
- queue.remove(&nodes[1]);
- EXPECT_TRUE(queue.validate());
- queue.remove(&nodes[0]);
- EXPECT_TRUE(queue.validate());
- queue.remove(&nodes[0]);
- EXPECT_TRUE(queue.validate());
-
- for (int i = 0; i < nodes.size(); ++i) {
- nodes[i].value = i;
- queue.enqueue(&nodes[i]);
- }
-
- for (int i = 0; i < nodes.size(); i += 2) {
- queue.remove(&nodes[i]);
- EXPECT_TRUE(queue.validate());
- }
-
- EXPECT_EQ(queue.size(), nodes.size() / 2);
- for (int i = 0; i < nodes.size() / 2; ++i) {
- IntNode* node = queue.dequeue();
- EXPECT_TRUE(node != nullptr);
- EXPECT_EQ(node->value, i * 2 + 1);
- }
-}
-
-const int VALIDATE_INTERVAL = 10000;
-
-// CHECK() is not thread safe so return the result in *failed.
-void ProducerThread(InternalQueue<IntNode>* queue, int num_inserts,
std::vector<IntNode>* nodes,
- std::atomic<int32_t>* counter, bool* failed) {
- for (int i = 0; i < num_inserts && !*failed; ++i) {
- // Get the next index to queue.
- int32_t value = (*counter)++;
- nodes->at(value).value = value;
- queue->enqueue(&nodes->at(value));
- if (i % VALIDATE_INTERVAL == 0) {
- if (!queue->validate()) {
- *failed = true;
- }
- }
- }
-}
-
-void ConsumerThread(InternalQueue<IntNode>* queue, int num_consumes, int delta,
- std::vector<int>* results, bool* failed) {
- // Dequeued nodes should be strictly increasing.
- int previous_value = -1;
- for (int i = 0; i < num_consumes && !*failed;) {
- IntNode* node = queue->dequeue();
- if (node == nullptr) {
- continue;
- }
- ++i;
- if (delta > 0) {
- if (node->value != previous_value + delta) {
- *failed = true;
- }
- } else if (delta == 0) {
- if (node->value <= previous_value) {
- *failed = true;
- }
- }
- results->push_back(node->value);
- previous_value = node->value;
- if (i % VALIDATE_INTERVAL == 0) {
- if (!queue->validate()) {
- *failed = true;
- }
- }
- }
-}
-
-TEST(InternalQueue, TestClear) {
- std::vector<IntNode> nodes;
- nodes.resize(100);
- InternalQueue<IntNode> queue;
- queue.enqueue(&nodes[0]);
- queue.enqueue(&nodes[1]);
- queue.enqueue(&nodes[2]);
-
- queue.clear();
- EXPECT_TRUE(queue.validate());
- EXPECT_TRUE(queue.empty());
-
- queue.enqueue(&nodes[0]);
- queue.enqueue(&nodes[1]);
- queue.enqueue(&nodes[2]);
- EXPECT_TRUE(queue.validate());
- EXPECT_EQ(queue.size(), 3);
-}
-
-TEST(InternalQueue, TestSingleProducerSingleConsumer) {
- std::vector<IntNode> nodes;
- std::atomic<int32_t> counter = 0;
- nodes.resize(LOOP_LESS_OR_MORE(100, 1000000));
- std::vector<int> results;
-
- InternalQueue<IntNode> queue;
- bool failed = false;
- ProducerThread(&queue, nodes.size(), &nodes, &counter, &failed);
- ConsumerThread(&queue, nodes.size(), 1, &results, &failed);
- EXPECT_TRUE(!failed);
- EXPECT_TRUE(queue.empty());
- EXPECT_EQ(results.size(), nodes.size());
-
- counter = 0;
- results.clear();
- thread producer_thread(ProducerThread, &queue, nodes.size(), &nodes,
&counter, &failed);
- thread consumer_thread(ConsumerThread, &queue, nodes.size(), 1, &results,
&failed);
- producer_thread.join();
- consumer_thread.join();
- EXPECT_TRUE(!failed);
- EXPECT_TRUE(queue.empty());
- EXPECT_EQ(results.size(), nodes.size());
-}
-
-TEST(InternalQueue, TestMultiProducerMultiConsumer) {
- std::vector<IntNode> nodes;
- nodes.resize(LOOP_LESS_OR_MORE(100, 1000000));
-
- bool failed = false;
- for (int num_producers = 1; num_producers < 5; num_producers += 3) {
- std::atomic<int32_t> counter = 0;
- const int NUM_CONSUMERS = 4;
- EXPECT_EQ(nodes.size() % NUM_CONSUMERS, 0);
- EXPECT_EQ(nodes.size() % num_producers, 0);
- const int num_per_consumer = nodes.size() / NUM_CONSUMERS;
- const int num_per_producer = nodes.size() / num_producers;
-
- std::vector<vector<int>> results;
- results.resize(NUM_CONSUMERS);
-
- int expected_delta = -1;
- if (NUM_CONSUMERS == 1 && num_producers == 1) {
- // With one producer and consumer, the queue should have
sequential values.
- expected_delta = 1;
- } else if (num_producers == 1) {
- // With one producer, the values added are sequential but can be
read off
- // with gaps in each consumer thread. E.g. thread1 reads: 1, 4,
5, 7, etc.
- // but they should be strictly increasing.
- expected_delta = 0;
- } else {
- // With multiple producers there isn't a guarantee on the order
values get
- // enqueued.
- expected_delta = -1;
- }
-
- InternalQueue<IntNode> queue;
- ThreadGroup consumers;
- ThreadGroup producers;
-
- for (int i = 0; i < num_producers; ++i) {
- producers.add_thread(new thread(ProducerThread, &queue,
num_per_producer, &nodes,
- &counter, &failed));
- }
-
- for (int i = 0; i < NUM_CONSUMERS; ++i) {
- consumers.add_thread(new thread(ConsumerThread, &queue,
num_per_consumer,
- expected_delta, &results[i],
&failed));
- }
-
- producers.join_all();
- consumers.join_all();
- EXPECT_TRUE(queue.empty());
- EXPECT_TRUE(!failed);
-
- std::vector<int> all_results;
- for (int i = 0; i < NUM_CONSUMERS; ++i) {
- EXPECT_EQ(results[i].size(), num_per_consumer);
- all_results.insert(all_results.end(), results[i].begin(),
results[i].end());
- }
- EXPECT_EQ(all_results.size(), nodes.size());
- sort(all_results.begin(), all_results.end());
- for (int i = 0; i < all_results.size(); ++i) {
- EXPECT_EQ(i, all_results[i]) << all_results[i - 1] << " " <<
all_results[i + 1];
- }
- }
-}
-
-} // end namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]