This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new be9385d40a [improvement](lock raii) use raii to lock and unlock 
(#16652)
be9385d40a is described below

commit be9385d40a653a9220dbc42fdc43c4b054aa7f08
Author: yiguolei <[email protected]>
AuthorDate: Mon Feb 13 14:06:36 2023 +0800

    [improvement](lock raii) use raii to lock and unlock (#16652)
    
    * [improvement](lock raii) use raii to lock and unlock
    
    This is part of exception safe: #16366.
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/olap/task/engine_storage_migration_task.cpp |  16 +-
 be/src/util/blocking_queue.hpp                     |  31 ---
 be/src/util/fake_lock.h                            |  40 ---
 be/src/util/internal_queue.h                       | 283 -------------------
 be/src/vec/exec/scan/vscanner.h                    |   3 -
 be/src/vec/json/parse2column.cpp                   |   1 -
 be/src/vec/runtime/vdata_stream_mgr.cpp            |  10 +-
 be/test/CMakeLists.txt                             |   1 -
 be/test/util/internal_queue_test.cpp               | 310 ---------------------
 9 files changed, 11 insertions(+), 684 deletions(-)

diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index 6ecefe4722..e0d7ca4378 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -91,15 +91,15 @@ Status 
EngineStorageMigrationTask::_check_running_txns_until_timeout(
     int try_times = 1;
     do {
         // to avoid invalid loops, the lock is guaranteed to be acquired here
-        std::unique_lock<std::shared_mutex> 
wlock(_tablet->get_migration_lock());
-        res = _check_running_txns();
-        if (res.ok()) {
-            // transfer the lock to the caller
-            *migration_wlock = std::move(wlock);
-            return res;
+        {
+            std::unique_lock<std::shared_mutex> 
wlock(_tablet->get_migration_lock());
+            res = _check_running_txns();
+            if (res.ok()) {
+                // transfer the lock to the caller
+                *migration_wlock = std::move(wlock);
+                return res;
+            }
         }
-        // unlock and sleep for a while, try again
-        wlock.unlock();
         sleep(std::min(config::sleep_one_second * try_times, 
CHECK_TXNS_MAX_WAIT_TIME_SECS));
         ++try_times;
     } while (!_is_timeout());
diff --git a/be/src/util/blocking_queue.hpp b/be/src/util/blocking_queue.hpp
index dff6911edf..35cd74ee89 100644
--- a/be/src/util/blocking_queue.hpp
+++ b/be/src/util/blocking_queue.hpp
@@ -64,37 +64,6 @@ public:
         }
     }
 
-    /// Puts an element into the queue, waiting until 'timeout_micros' 
elapses, if there is
-    /// no space. If the queue is shut down, or if the timeout elapsed without 
being able to
-    /// put the element, returns false.
-    /*
-    bool blocking_put_with_timeout(const T& val, int64_t timeout_micros) {
-        MonotonicStopWatch timer;
-        std::unique_lock<std::mutex> write_lock(_lock);
-        std::system_time wtime = std::get_system_time() +
-            std::posix_time::microseconds(timeout_micros);
-        const struct timespec timeout = std::detail::to_timespec(wtime);
-        bool notified = true;
-        while (SizeLocked(write_lock) >= _max_elements && !_shutdown && 
notified) {
-            timer.Start();
-            // Wait until we're notified or until the timeout expires.
-            notified = _put_cv.TimedWait(write_lock, &timeout);
-            timer.Stop();
-        }
-        _total_put_wait_time += timer.ElapsedTime();
-        // If the list is still full or if the the queue has been shut down, 
return false.
-        // NOTE: We don't check 'notified' here as it appears that pthread 
condition variables
-        // have a weird behavior in which they can return ETIMEDOUT from 
timed_wait even if
-        // another thread did in fact signal
-        if (SizeLocked(write_lock) >= _max_elements || _shutdown) return false;
-        DCHECK_LT(put_list_.size(), _max_elements);
-        _list.push_back(val);
-        write_lock.unlock();
-        _get_cv.NotifyOne();
-        return true;
-    }
-    */
-
     // Puts an element into the queue, waiting indefinitely until there is 
space.
     // If the queue is shut down, returns false.
     bool blocking_put(const T& val) {
diff --git a/be/src/util/fake_lock.h b/be/src/util/fake_lock.h
deleted file mode 100644
index a706371298..0000000000
--- a/be/src/util/fake_lock.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/fake-lock.h
-// and modified by Doris
-
-#pragma once
-
-#include "gutil/macros.h"
-
-namespace doris {
-
-// Implementation of Boost's lockable interface that does nothing. Used to 
replace an
-// actual lock implementation in template classes in if no thread safety is 
needed.
-class FakeLock {
-public:
-    FakeLock() {}
-    void lock() {}
-    void unlock() {}
-    bool try_lock() { return true; }
-
-private:
-    DISALLOW_COPY_AND_ASSIGN(FakeLock);
-};
-
-} // namespace doris
diff --git a/be/src/util/internal_queue.h b/be/src/util/internal_queue.h
deleted file mode 100644
index a20944893a..0000000000
--- a/be/src/util/internal_queue.h
+++ /dev/null
@@ -1,283 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-// This file is copied from
-// 
https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/internal-queue.h
-// and modified by Doris
-
-#pragma once
-
-#include <functional>
-#include <mutex>
-
-#include "common/logging.h"
-#include "util/fake_lock.h"
-#include "util/spinlock.h"
-
-namespace doris {
-
-/// FIFO queue implemented as a doubly-linked lists with internal pointers. 
This is in
-/// contrast to the STL list which allocates a wrapper Node object around the 
data. Since
-/// it's an internal queue, the list pointers are maintained in the Nodes 
which is memory
-/// owned by the user. The nodes cannot be deallocated while the queue has 
elements.
-/// The internal structure is a doubly-linked list.
-///  nullptr <-- N1 <--> N2 <--> N3 --> nullptr
-///          (head)          (tail)
-///
-/// InternalQueue<T> instantiates a thread-safe queue where the queue is 
protected by an
-/// internal Spinlock. InternalList<T> instantiates a list with no thread 
safety.
-///
-/// To use these data structures, the element to be added to the queue or list 
must
-/// subclass ::Node.
-///
-/// TODO: this is an ideal candidate to be made lock free.
-
-/// T must be a subclass of InternalQueueBase::Node.
-template <typename LockType, typename T>
-class InternalQueueBase {
-public:
-    struct Node {
-    public:
-        Node() : parent_queue(nullptr), next_node(nullptr), prev_node(nullptr) 
{}
-        virtual ~Node() {}
-
-        /// Returns true if the node is in a queue.
-        bool in_queue() const { return parent_queue != nullptr; }
-
-        /// Returns the Next/Prev node or nullptr if this is the end/front.
-        T* next() const {
-            std::lock_guard<LockType> lock(parent_queue->lock_);
-            return reinterpret_cast<T*>(next_node);
-        }
-        T* prev() const {
-            std::lock_guard<LockType> lock(parent_queue->lock_);
-            return reinterpret_cast<T*>(prev_node);
-        }
-
-    private:
-        friend class InternalQueueBase<LockType, T>;
-
-        /// Pointer to the queue this Node is on. nullptr if not on any queue.
-        InternalQueueBase<LockType, T>* parent_queue;
-        Node* next_node;
-        Node* prev_node;
-    };
-
-    InternalQueueBase() : head_(nullptr), tail_(nullptr), size_(0) {}
-
-    /// Returns the element at the head of the list without dequeuing or 
nullptr
-    /// if the queue is empty. This is O(1).
-    T* head() const {
-        std::lock_guard<LockType> lock(lock_);
-        if (empty()) return nullptr;
-        return reinterpret_cast<T*>(head_);
-    }
-
-    /// Returns the element at the end of the list without dequeuing or nullptr
-    /// if the queue is empty. This is O(1).
-    T* tail() {
-        std::lock_guard<LockType> lock(lock_);
-        if (empty()) return nullptr;
-        return reinterpret_cast<T*>(tail_);
-    }
-
-    /// Enqueue node onto the queue's tail. This is O(1).
-    void enqueue(T* n) {
-        Node* node = (Node*)n;
-        DCHECK(node->next_node == nullptr);
-        DCHECK(node->prev_node == nullptr);
-        DCHECK(node->parent_queue == nullptr);
-        node->parent_queue = this;
-        {
-            std::lock_guard<LockType> lock(lock_);
-            if (tail_ != nullptr) tail_->next_node = node;
-            node->prev_node = tail_;
-            tail_ = node;
-            if (head_ == nullptr) head_ = node;
-            ++size_;
-        }
-    }
-
-    /// Dequeues an element from the queue's head. Returns nullptr if the queue
-    /// is empty. This is O(1).
-    T* dequeue() {
-        Node* result = nullptr;
-        {
-            std::lock_guard<LockType> lock(lock_);
-            if (empty()) return nullptr;
-            --size_;
-            result = head_;
-            head_ = head_->next_node;
-            if (head_ == nullptr) {
-                tail_ = nullptr;
-            } else {
-                head_->prev_node = nullptr;
-            }
-        }
-        DCHECK(result != nullptr);
-        result->next_node = result->prev_node = nullptr;
-        result->parent_queue = nullptr;
-        return reinterpret_cast<T*>(result);
-    }
-
-    /// Dequeues an element from the queue's tail. Returns nullptr if the queue
-    /// is empty. This is O(1).
-    T* pop_back() {
-        Node* result = nullptr;
-        {
-            std::lock_guard<LockType> lock(lock_);
-            if (empty()) return nullptr;
-            --size_;
-            result = tail_;
-            tail_ = tail_->prev_node;
-            if (tail_ == nullptr) {
-                head_ = nullptr;
-            } else {
-                tail_->next_node = nullptr;
-            }
-        }
-        DCHECK(result != nullptr);
-        result->next_node = result->prev_node = nullptr;
-        result->parent_queue = nullptr;
-        return reinterpret_cast<T*>(result);
-    }
-
-    /// Removes 'node' from the queue. This is O(1). No-op if node is
-    /// not on the list. Returns true if removed
-    bool remove(T* n) {
-        Node* node = (Node*)n;
-        if (node->parent_queue != this) return false;
-        {
-            std::lock_guard<LockType> lock(lock_);
-            if (node->next_node == nullptr && node->prev_node == nullptr) {
-                // Removing only node
-                DCHECK(node == head_);
-                DCHECK(tail_ == node);
-                head_ = tail_ = nullptr;
-                --size_;
-                node->parent_queue = nullptr;
-                return true;
-            }
-
-            if (head_ == node) {
-                DCHECK(node->prev_node == nullptr);
-                head_ = node->next_node;
-            } else {
-                DCHECK(node->prev_node != nullptr);
-                node->prev_node->next_node = node->next_node;
-            }
-
-            if (node == tail_) {
-                DCHECK(node->next_node == nullptr);
-                tail_ = node->prev_node;
-            } else if (node->next_node != nullptr) {
-                node->next_node->prev_node = node->prev_node;
-            }
-            --size_;
-        }
-        node->next_node = node->prev_node = nullptr;
-        node->parent_queue = nullptr;
-        return true;
-    }
-
-    /// Clears all elements in the list.
-    void clear() {
-        std::lock_guard<LockType> lock(lock_);
-        Node* cur = head_;
-        while (cur != nullptr) {
-            Node* tmp = cur;
-            cur = cur->next_node;
-            tmp->prev_node = tmp->next_node = nullptr;
-            tmp->parent_queue = nullptr;
-        }
-        size_ = 0;
-        head_ = tail_ = nullptr;
-    }
-
-    int size() const { return size_; }
-    bool empty() const { return head_ == nullptr; }
-
-    /// Returns if the target is on the queue. This is O(1) and does not 
acquire any locks.
-    bool contains(const T* target) const { return target->parent_queue == 
this; }
-
-    /// Validates the internal structure of the list
-    bool validate() {
-        int num_elements_found = 0;
-        std::lock_guard<LockType> lock(lock_);
-        if (head_ == nullptr) {
-            if (tail_ != nullptr) return false;
-            if (size() != 0) return false;
-            return true;
-        }
-
-        if (head_->prev_node != nullptr) return false;
-        Node* current = head_;
-        while (current != nullptr) {
-            if (current->parent_queue != this) return false;
-            ++num_elements_found;
-            Node* next_node = current->next_node;
-            if (next_node == nullptr) {
-                if (current != tail_) return false;
-            } else {
-                if (next_node->prev_node != current) return false;
-            }
-            current = next_node;
-        }
-        if (num_elements_found != size()) return false;
-        return true;
-    }
-
-    // Iterate over elements of queue, calling 'fn' for each element. If 'fn' 
returns
-    // false, terminate iteration. It is invalid to call other InternalQueue 
methods
-    // from 'fn'.
-    void iterate(std::function<bool(T*)> fn) {
-        std::lock_guard<LockType> lock(lock_);
-        for (Node* current = head_; current != nullptr; current = 
current->next_node) {
-            if (!fn(reinterpret_cast<T*>(current))) return;
-        }
-    }
-
-    /// Prints the queue ptrs to a string.
-    std::string DebugString() {
-        std::stringstream ss;
-        ss << "(";
-        {
-            std::lock_guard<LockType> lock(lock_);
-            Node* curr = head_;
-            while (curr != nullptr) {
-                ss << (void*)curr;
-                curr = curr->next_node;
-            }
-        }
-        ss << ")";
-        return ss.str();
-    }
-
-private:
-    friend struct Node;
-    mutable LockType lock_;
-    Node *head_, *tail_;
-    int size_;
-};
-
-// The default LockType is SpinLock.
-template <typename T>
-class InternalQueue : public InternalQueueBase<SpinLock, T> {};
-
-// InternalList is a non-threadsafe implementation.
-template <typename T>
-class InternalList : public InternalQueueBase<FakeLock, T> {};
-} // namespace doris
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 8c01727ba8..a909e91ee3 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -138,9 +138,6 @@ protected:
     // If _input_tuple_desc is set, the scanner will read data into
     // this _input_block first, then convert to the output block.
     Block _input_block;
-    // If _input_tuple_desc is set, this will point to _input_block,
-    // otherwise, it will point to the output block.
-    Block* _input_block_ptr;
 
     bool _is_open = false;
     bool _is_closed = false;
diff --git a/be/src/vec/json/parse2column.cpp b/be/src/vec/json/parse2column.cpp
index 6a6474c8ad..3970bb0156 100644
--- a/be/src/vec/json/parse2column.cpp
+++ b/be/src/vec/json/parse2column.cpp
@@ -63,7 +63,6 @@ public:
     Pointer get(Factory&& f) {
         std::unique_lock lock(mutex);
         if (stack.empty()) {
-            lock.unlock();
             return {f(), this};
         }
         auto object = stack.top().release();
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index b89935b1c0..282d641431 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -67,8 +67,10 @@ std::shared_ptr<VDataStreamRecvr> 
VDataStreamMgr::find_recvr(const TUniqueId& fr
                                                              bool 
acquire_lock) {
     VLOG_ROW << "looking up fragment_instance_id=" << fragment_instance_id << 
", node=" << node_id;
     size_t hash_value = get_hash_value(fragment_instance_id, node_id);
+    // Create lock guard and not own lock currently and will lock conditionally
+    std::unique_lock recvr_lock(_lock, std::defer_lock);
     if (acquire_lock) {
-        _lock.lock();
+        recvr_lock.lock();
     }
     std::pair<StreamMap::iterator, StreamMap::iterator> range =
             _receiver_map.equal_range(hash_value);
@@ -76,16 +78,10 @@ std::shared_ptr<VDataStreamRecvr> 
VDataStreamMgr::find_recvr(const TUniqueId& fr
         auto recvr = range.first->second;
         if (recvr->fragment_instance_id() == fragment_instance_id &&
             recvr->dest_node_id() == node_id) {
-            if (acquire_lock) {
-                _lock.unlock();
-            }
             return recvr;
         }
         ++range.first;
     }
-    if (acquire_lock) {
-        _lock.unlock();
-    }
     return std::shared_ptr<VDataStreamRecvr>();
 }
 
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 7fa7493bf6..2473ac272d 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -176,7 +176,6 @@ set(UTIL_TEST_FILES
     util/crc32c_test.cpp
     util/lru_cache_util_test.cpp
     util/filesystem_util_test.cpp
-    util/internal_queue_test.cpp
     util/cidr_test.cpp
     util/metrics_test.cpp
     util/doris_metrics_test.cpp
diff --git a/be/test/util/internal_queue_test.cpp 
b/be/test/util/internal_queue_test.cpp
deleted file mode 100644
index fe11c9059e..0000000000
--- a/be/test/util/internal_queue_test.cpp
+++ /dev/null
@@ -1,310 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "util/internal_queue.h"
-
-#include <gtest/gtest.h>
-#include <unistd.h>
-
-#include <mutex>
-#include <thread>
-
-#include "common/configbase.h"
-#include "testutil/test_util.h"
-#include "util/thread_group.h"
-
-using std::vector;
-using std::thread;
-
-namespace doris {
-
-struct IntNode : public InternalQueue<IntNode>::Node {
-    IntNode() : value() {}
-    IntNode(int value) : value(value) {}
-    int value;
-};
-
-// Basic single threaded operation.
-TEST(InternalQueue, TestBasic) {
-    IntNode one(1);
-    IntNode two(2);
-    IntNode three(3);
-    IntNode four(4);
-
-    InternalQueue<IntNode> list;
-    EXPECT_TRUE(list.empty());
-    EXPECT_EQ(list.size(), 0);
-    EXPECT_TRUE(list.dequeue() == nullptr);
-    EXPECT_TRUE(list.validate());
-
-    list.enqueue(&one);
-    EXPECT_TRUE(!list.empty());
-    EXPECT_EQ(list.size(), 1);
-    IntNode* i = list.dequeue();
-    EXPECT_TRUE(i != nullptr);
-    EXPECT_TRUE(list.empty());
-    EXPECT_EQ(list.size(), 0);
-    EXPECT_EQ(i->value, 1);
-    EXPECT_TRUE(list.validate());
-
-    list.enqueue(&one);
-    list.enqueue(&two);
-    list.enqueue(&three);
-    list.enqueue(&four);
-    EXPECT_EQ(list.size(), 4);
-    EXPECT_TRUE(list.validate());
-
-    i = list.dequeue();
-    EXPECT_TRUE(i != nullptr);
-    EXPECT_EQ(i->value, 1);
-    EXPECT_TRUE(list.validate());
-
-    i = list.dequeue();
-    EXPECT_TRUE(i != nullptr);
-    EXPECT_EQ(i->value, 2);
-    EXPECT_TRUE(list.validate());
-
-    i = list.dequeue();
-    EXPECT_TRUE(i != nullptr);
-    EXPECT_EQ(i->value, 3);
-    EXPECT_TRUE(list.validate());
-
-    i = list.dequeue();
-    EXPECT_TRUE(i != nullptr);
-    EXPECT_EQ(i->value, 4);
-    EXPECT_TRUE(list.validate());
-
-    list.enqueue(&one);
-    list.enqueue(&two);
-    list.enqueue(&three);
-    list.enqueue(&four);
-
-    IntNode* node = list.head();
-    int val = 1;
-    while (node != nullptr) {
-        EXPECT_EQ(node->value, val);
-        node = node->next();
-        ++val;
-    }
-
-    node = list.tail();
-    val = 4;
-    while (node != nullptr) {
-        EXPECT_EQ(node->value, val);
-        node = node->prev();
-        --val;
-    }
-
-    for (int i = 0; i < 4; ++i) {
-        node = list.pop_back();
-        EXPECT_TRUE(node != nullptr);
-        EXPECT_EQ(node->value, 4 - i);
-        EXPECT_TRUE(list.validate());
-    }
-    EXPECT_TRUE(list.pop_back() == nullptr);
-    EXPECT_EQ(list.size(), 0);
-    EXPECT_TRUE(list.empty());
-}
-
-// Add all the nodes and then remove every other one.
-TEST(InternalQueue, TestRemove) {
-    std::vector<IntNode> nodes;
-    nodes.resize(100);
-
-    InternalQueue<IntNode> queue;
-
-    queue.enqueue(&nodes[0]);
-    queue.remove(&nodes[1]);
-    EXPECT_TRUE(queue.validate());
-    queue.remove(&nodes[0]);
-    EXPECT_TRUE(queue.validate());
-    queue.remove(&nodes[0]);
-    EXPECT_TRUE(queue.validate());
-
-    for (int i = 0; i < nodes.size(); ++i) {
-        nodes[i].value = i;
-        queue.enqueue(&nodes[i]);
-    }
-
-    for (int i = 0; i < nodes.size(); i += 2) {
-        queue.remove(&nodes[i]);
-        EXPECT_TRUE(queue.validate());
-    }
-
-    EXPECT_EQ(queue.size(), nodes.size() / 2);
-    for (int i = 0; i < nodes.size() / 2; ++i) {
-        IntNode* node = queue.dequeue();
-        EXPECT_TRUE(node != nullptr);
-        EXPECT_EQ(node->value, i * 2 + 1);
-    }
-}
-
-const int VALIDATE_INTERVAL = 10000;
-
-// CHECK() is not thread safe so return the result in *failed.
-void ProducerThread(InternalQueue<IntNode>* queue, int num_inserts, 
std::vector<IntNode>* nodes,
-                    std::atomic<int32_t>* counter, bool* failed) {
-    for (int i = 0; i < num_inserts && !*failed; ++i) {
-        // Get the next index to queue.
-        int32_t value = (*counter)++;
-        nodes->at(value).value = value;
-        queue->enqueue(&nodes->at(value));
-        if (i % VALIDATE_INTERVAL == 0) {
-            if (!queue->validate()) {
-                *failed = true;
-            }
-        }
-    }
-}
-
-void ConsumerThread(InternalQueue<IntNode>* queue, int num_consumes, int delta,
-                    std::vector<int>* results, bool* failed) {
-    // Dequeued nodes should be strictly increasing.
-    int previous_value = -1;
-    for (int i = 0; i < num_consumes && !*failed;) {
-        IntNode* node = queue->dequeue();
-        if (node == nullptr) {
-            continue;
-        }
-        ++i;
-        if (delta > 0) {
-            if (node->value != previous_value + delta) {
-                *failed = true;
-            }
-        } else if (delta == 0) {
-            if (node->value <= previous_value) {
-                *failed = true;
-            }
-        }
-        results->push_back(node->value);
-        previous_value = node->value;
-        if (i % VALIDATE_INTERVAL == 0) {
-            if (!queue->validate()) {
-                *failed = true;
-            }
-        }
-    }
-}
-
-TEST(InternalQueue, TestClear) {
-    std::vector<IntNode> nodes;
-    nodes.resize(100);
-    InternalQueue<IntNode> queue;
-    queue.enqueue(&nodes[0]);
-    queue.enqueue(&nodes[1]);
-    queue.enqueue(&nodes[2]);
-
-    queue.clear();
-    EXPECT_TRUE(queue.validate());
-    EXPECT_TRUE(queue.empty());
-
-    queue.enqueue(&nodes[0]);
-    queue.enqueue(&nodes[1]);
-    queue.enqueue(&nodes[2]);
-    EXPECT_TRUE(queue.validate());
-    EXPECT_EQ(queue.size(), 3);
-}
-
-TEST(InternalQueue, TestSingleProducerSingleConsumer) {
-    std::vector<IntNode> nodes;
-    std::atomic<int32_t> counter = 0;
-    nodes.resize(LOOP_LESS_OR_MORE(100, 1000000));
-    std::vector<int> results;
-
-    InternalQueue<IntNode> queue;
-    bool failed = false;
-    ProducerThread(&queue, nodes.size(), &nodes, &counter, &failed);
-    ConsumerThread(&queue, nodes.size(), 1, &results, &failed);
-    EXPECT_TRUE(!failed);
-    EXPECT_TRUE(queue.empty());
-    EXPECT_EQ(results.size(), nodes.size());
-
-    counter = 0;
-    results.clear();
-    thread producer_thread(ProducerThread, &queue, nodes.size(), &nodes, 
&counter, &failed);
-    thread consumer_thread(ConsumerThread, &queue, nodes.size(), 1, &results, 
&failed);
-    producer_thread.join();
-    consumer_thread.join();
-    EXPECT_TRUE(!failed);
-    EXPECT_TRUE(queue.empty());
-    EXPECT_EQ(results.size(), nodes.size());
-}
-
-TEST(InternalQueue, TestMultiProducerMultiConsumer) {
-    std::vector<IntNode> nodes;
-    nodes.resize(LOOP_LESS_OR_MORE(100, 1000000));
-
-    bool failed = false;
-    for (int num_producers = 1; num_producers < 5; num_producers += 3) {
-        std::atomic<int32_t> counter = 0;
-        const int NUM_CONSUMERS = 4;
-        EXPECT_EQ(nodes.size() % NUM_CONSUMERS, 0);
-        EXPECT_EQ(nodes.size() % num_producers, 0);
-        const int num_per_consumer = nodes.size() / NUM_CONSUMERS;
-        const int num_per_producer = nodes.size() / num_producers;
-
-        std::vector<vector<int>> results;
-        results.resize(NUM_CONSUMERS);
-
-        int expected_delta = -1;
-        if (NUM_CONSUMERS == 1 && num_producers == 1) {
-            // With one producer and consumer, the queue should have 
sequential values.
-            expected_delta = 1;
-        } else if (num_producers == 1) {
-            // With one producer, the values added are sequential but can be 
read off
-            // with gaps in each consumer thread.  E.g. thread1 reads: 1, 4, 
5, 7, etc.
-            // but they should be strictly increasing.
-            expected_delta = 0;
-        } else {
-            // With multiple producers there isn't a guarantee on the order 
values get
-            // enqueued.
-            expected_delta = -1;
-        }
-
-        InternalQueue<IntNode> queue;
-        ThreadGroup consumers;
-        ThreadGroup producers;
-
-        for (int i = 0; i < num_producers; ++i) {
-            producers.add_thread(new thread(ProducerThread, &queue, 
num_per_producer, &nodes,
-                                            &counter, &failed));
-        }
-
-        for (int i = 0; i < NUM_CONSUMERS; ++i) {
-            consumers.add_thread(new thread(ConsumerThread, &queue, 
num_per_consumer,
-                                            expected_delta, &results[i], 
&failed));
-        }
-
-        producers.join_all();
-        consumers.join_all();
-        EXPECT_TRUE(queue.empty());
-        EXPECT_TRUE(!failed);
-
-        std::vector<int> all_results;
-        for (int i = 0; i < NUM_CONSUMERS; ++i) {
-            EXPECT_EQ(results[i].size(), num_per_consumer);
-            all_results.insert(all_results.end(), results[i].begin(), 
results[i].end());
-        }
-        EXPECT_EQ(all_results.size(), nodes.size());
-        sort(all_results.begin(), all_results.end());
-        for (int i = 0; i < all_results.size(); ++i) {
-            EXPECT_EQ(i, all_results[i]) << all_results[i - 1] << " " << 
all_results[i + 1];
-        }
-    }
-}
-
-} // end namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to