This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a4924dabb7ee630c0ee7cb294a886734f8b6d7bf
Author: yiguolei <[email protected]>
AuthorDate: Wed Apr 10 16:47:10 2024 +0800

    [enhancement](exception) enble exception logic in pipeline execute thread 
(#33437)
    
    * [enhancement](exception) enble exception logic in pipeline execute thread
    
    * f
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/olap/memtable.h                             |   2 +-
 be/src/olap/skiplist.h                             | 462 ---------------------
 be/src/pipeline/task_scheduler.cpp                 |   4 +
 .../runtime/routine_load/data_consumer_group.cpp   |   3 +-
 be/test/olap/skiplist_test.cpp                     | 423 -------------------
 5 files changed, 7 insertions(+), 887 deletions(-)

diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 6fa140846ac..4ee245af359 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -222,7 +222,7 @@ private:
     // when the sum of all memtable (_insert_manual_mem_tracker + 
_flush_hook_mem_tracker) exceeds the limit.
     std::shared_ptr<MemTracker> _insert_mem_tracker;
     std::shared_ptr<MemTracker> _flush_mem_tracker;
-    // Only the rows will be inserted into SkipList can allocate memory from 
_arena.
+    // Only the rows will be inserted into block can allocate memory from 
_arena.
     // In this way, we can make MemTable::memory_usage() to be more accurate, 
and eventually
     // reduce the number of segment files that are generated by current load
     std::unique_ptr<vectorized::Arena> _arena;
diff --git a/be/src/olap/skiplist.h b/be/src/olap/skiplist.h
deleted file mode 100644
index d8e3335e502..00000000000
--- a/be/src/olap/skiplist.h
+++ /dev/null
@@ -1,462 +0,0 @@
-// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style license that can be
-// found in the LICENSE file. See the AUTHORS file for names of contributors.
-
-#pragma once
-
-// Thread safety
-// -------------
-//
-// Writes require external synchronization, most likely a mutex.
-// Reads require a guarantee that the SkipList will not be destroyed
-// while the read is in progress.  Apart from that, reads progress
-// without any internal locking or synchronization.
-//
-// Invariants:
-//
-// (1) Allocated nodes are never deleted until the SkipList is
-// destroyed.  This is trivially guaranteed by the code since we
-// never delete any skip list nodes.
-//
-// (2) The contents of a Node except for the next/prev pointers are
-// immutable after the Node has been linked into the SkipList.
-// Only Insert() modifies the list, and it is careful to initialize
-// a node and use release-stores to publish the nodes in one or
-// more lists.
-//
-// ... prev vs. next pointer ordering ...
-
-#include <gen_cpp/olap_file.pb.h>
-
-#include <atomic>
-
-#include "common/logging.h"
-#include "util/random.h"
-#include "vec/common/arena.h"
-
-namespace doris {
-
-template <typename Key, class Comparator>
-class SkipList {
-private:
-    struct Node;
-    enum { kMaxHeight = 12 };
-
-public:
-    typedef Key key_type;
-    // One Hint object is to show position info of one row.
-    // It is used in the following scenarios:
-    //   // 1. check for existence
-    //   bool is_exist = skiplist->Find(key, &hint);
-    //   // 2. Do something separately based on the value of is_exist
-    //   if (is_exist) {
-    //       do_something1 ();
-    //   } else {
-    //       do_something2 ();
-    //       skiplist->InsertWithHint(key, is_exist, hint);
-    //   }
-    //
-    // Note: The user should guarantee that there must not be any other 
insertion
-    // between calling Find() and InsertWithHint().
-    struct Hint {
-        Node* curr = nullptr;
-        Node* prev[kMaxHeight];
-    };
-
-    // Create a new SkipList object that will use "cmp" for comparing keys,
-    // and will allocate memory using "*arena".
-    // NOTE: Objects allocated in the arena must remain allocated for
-    // the lifetime of the skiplist object.
-    explicit SkipList(Comparator* cmp, vectorized::Arena* arena, bool can_dup);
-
-    // Insert key into the list.
-    void Insert(const Key& key, bool* overwritten);
-    // Use hint to insert a key. the hint is from previous Find()
-    void InsertWithHint(const Key& key, bool is_exist, Hint* hint);
-
-    // Returns true if an entry that compares equal to key is in the list.
-    bool Contains(const Key& key) const;
-    // Like Contains(), but it will return the position info as a hint. We can 
use this
-    // position info to insert directly using InsertWithHint().
-    bool Find(const Key& key, Hint* hint) const;
-
-    // Iteration over the contents of a skip list
-    class Iterator {
-    public:
-        // Initialize an iterator over the specified list.
-        // The returned iterator is not valid.
-        explicit Iterator(const SkipList* list);
-
-        // Returns true if the iterator is positioned at a valid node.
-        bool Valid() const;
-
-        // Returns the key at the current position.
-        // REQUIRES: Valid()
-        const Key& key() const;
-
-        // Advances to the next position.
-        // REQUIRES: Valid()
-        void Next();
-
-        // Advances to the previous position.
-        // REQUIRES: Valid()
-        void Prev();
-
-        // Advance to the first entry with a key >= target
-        void Seek(const Key& target);
-
-        // Position at the first entry in list.
-        // Final state of iterator is Valid() if list is not empty.
-        void SeekToFirst();
-
-        // Position at the last entry in list.
-        // Final state of iterator is Valid() if list is not empty.
-        void SeekToLast();
-
-    private:
-        const SkipList* list_ = nullptr;
-        Node* node_ = nullptr;
-        // Intentionally copyable
-    };
-
-private:
-    // Immutable after construction
-    Comparator* const compare_;
-    // When value is true, means indicates that duplicate values are allowed.
-    bool _can_dup;
-    vectorized::Arena* const _arena; // Arena used for allocations of nodes
-
-    Node* const head_;
-
-    // Modified only by Insert().  Read racily by readers, but stale
-    // values are ok.
-    std::atomic<int> max_height_; // Height of the entire list
-
-    int GetMaxHeight() const { return 
max_height_.load(std::memory_order_relaxed); }
-
-    // Read/written only by Insert().
-    Random rnd_;
-
-    Node* NewNode(const Key& key, int height);
-    int RandomHeight();
-    bool Equal(const Key& a, const Key& b) const { return ((*compare_)(a, b) 
== 0); }
-
-    // Return true if key is greater than the data stored in "n"
-    bool KeyIsAfterNode(const Key& key, Node* n) const;
-
-    // Return the earliest node that comes at or after key.
-    // Return nullptr if there is no such node.
-    //
-    // If prev is non-nullptr, fills prev[level] with pointer to previous
-    // node at "level" for every level in [0..max_height_-1].
-    Node* FindGreaterOrEqual(const Key& key, Node** prev) const;
-
-    // Return the latest node with a key < key.
-    // Return head_ if there is no such node.
-    Node* FindLessThan(const Key& key) const;
-
-    // Return the last node in the list.
-    // Return head_ if list is empty.
-    Node* FindLast() const;
-
-    // No copying allowed
-    SkipList(const SkipList&);
-    void operator=(const SkipList&);
-};
-
-// Implementation details follow
-template <typename Key, class Comparator>
-struct SkipList<Key, Comparator>::Node {
-    explicit Node(const Key& k) : key(k) {}
-
-    Key const key;
-
-    // Accessors/mutators for links.  Wrapped in methods so we can
-    // add the appropriate barriers as necessary.
-    Node* Next(int n) {
-        DCHECK(n >= 0);
-        // Use an 'acquire load' so that we observe a fully initialized
-        // version of the returned Node.
-        return (next_[n].load(std::memory_order_acquire));
-    }
-    void SetNext(int n, Node* x) {
-        DCHECK(n >= 0);
-        // Use a 'release store' so that anybody who reads through this
-        // pointer observes a fully initialized version of the inserted node.
-        next_[n].store(x, std::memory_order_release);
-    }
-
-    // No-barrier variants that can be safely used in a few locations.
-    Node* NoBarrier_Next(int n) {
-        DCHECK(n >= 0);
-        return next_[n].load(std::memory_order_relaxed);
-    }
-    void NoBarrier_SetNext(int n, Node* x) {
-        DCHECK(n >= 0);
-        next_[n].store(x, std::memory_order_relaxed);
-    }
-
-private:
-    // Array of length equal to the node height.  next_[0] is lowest level 
link.
-    std::atomic<Node*> next_[1];
-};
-
-template <typename Key, class Comparator>
-typename SkipList<Key, Comparator>::Node* SkipList<Key, 
Comparator>::NewNode(const Key& key,
-                                                                             
int height) {
-    char* mem = _arena->alloc(sizeof(Node) + sizeof(std::atomic<Node*>) * 
(height - 1));
-    return new (mem) Node(key);
-}
-
-template <typename Key, class Comparator>
-SkipList<Key, Comparator>::Iterator::Iterator(const SkipList* list) {
-    list_ = list;
-    node_ = nullptr;
-}
-
-template <typename Key, class Comparator>
-bool SkipList<Key, Comparator>::Iterator::Valid() const {
-    return node_ != nullptr;
-}
-
-template <typename Key, class Comparator>
-const Key& SkipList<Key, Comparator>::Iterator::key() const {
-    DCHECK(Valid());
-    return node_->key;
-}
-
-template <typename Key, class Comparator>
-void SkipList<Key, Comparator>::Iterator::Next() {
-    DCHECK(Valid());
-    node_ = node_->Next(0);
-}
-
-template <typename Key, class Comparator>
-void SkipList<Key, Comparator>::Iterator::Prev() {
-    // Instead of using explicit "prev" links, we just search for the
-    // last node that falls before key.
-    DCHECK(Valid());
-    node_ = list_->FindLessThan(node_->key);
-    if (node_ == list_->head_) {
-        node_ = nullptr;
-    }
-}
-
-template <typename Key, class Comparator>
-void SkipList<Key, Comparator>::Iterator::Seek(const Key& target) {
-    node_ = list_->FindGreaterOrEqual(target, nullptr);
-}
-
-template <typename Key, class Comparator>
-void SkipList<Key, Comparator>::Iterator::SeekToFirst() {
-    node_ = list_->head_->Next(0);
-}
-
-template <typename Key, class Comparator>
-void SkipList<Key, Comparator>::Iterator::SeekToLast() {
-    node_ = list_->FindLast();
-    if (node_ == list_->head_) {
-        node_ = nullptr;
-    }
-}
-
-template <typename Key, class Comparator>
-int SkipList<Key, Comparator>::RandomHeight() {
-    // Increase height with probability 1 in kBranching
-    static const unsigned int kBranching = 4;
-    int height = 1;
-    while (height < kMaxHeight && ((rnd_.Next() % kBranching) == 0)) {
-        height++;
-    }
-    DCHECK(height > 0);
-    DCHECK(height <= kMaxHeight);
-    return height;
-}
-
-template <typename Key, class Comparator>
-bool SkipList<Key, Comparator>::KeyIsAfterNode(const Key& key, Node* n) const {
-    // nullptr n is considered infinite
-    return (n != nullptr) && ((*compare_)(n->key, key) < 0);
-}
-
-template <typename Key, class Comparator>
-typename SkipList<Key, Comparator>::Node* SkipList<Key, 
Comparator>::FindGreaterOrEqual(
-        const Key& key, Node** prev) const {
-    Node* x = head_;
-    int level = GetMaxHeight() - 1;
-    while (true) {
-        Node* next = x->Next(level);
-        if (KeyIsAfterNode(key, next)) {
-            // Keep searching in this list
-            x = next;
-        } else {
-            if (prev != nullptr) prev[level] = x;
-            if (level == 0) {
-                return next;
-            } else {
-                // Switch to next list
-                level--;
-            }
-        }
-    }
-}
-
-template <typename Key, class Comparator>
-typename SkipList<Key, Comparator>::Node* SkipList<Key, 
Comparator>::FindLessThan(
-        const Key& key) const {
-    Node* x = head_;
-    int level = GetMaxHeight() - 1;
-    while (true) {
-        DCHECK(x == head_ || (*compare_)(x->key, key) < 0);
-        Node* next = x->Next(level);
-        if (next == nullptr || (*compare_)(next->key, key) >= 0) {
-            if (level == 0) {
-                return x;
-            } else {
-                // Switch to next list
-                level--;
-            }
-        } else {
-            x = next;
-        }
-    }
-}
-
-template <typename Key, class Comparator>
-typename SkipList<Key, Comparator>::Node* SkipList<Key, 
Comparator>::FindLast() const {
-    Node* x = head_;
-    int level = GetMaxHeight() - 1;
-    while (true) {
-        Node* next = x->Next(level);
-        if (next == nullptr) {
-            if (level == 0) {
-                return x;
-            } else {
-                // Switch to next list
-                level--;
-            }
-        } else {
-            x = next;
-        }
-    }
-}
-
-template <typename Key, class Comparator>
-SkipList<Key, Comparator>::SkipList(Comparator* cmp, vectorized::Arena* arena, 
bool can_dup)
-        : compare_(cmp),
-          _can_dup(can_dup),
-          _arena(arena),
-          head_(NewNode(0 /* any key will do */, kMaxHeight)),
-          max_height_(1),
-          rnd_(0xdeadbeef) {
-    for (int i = 0; i < kMaxHeight; i++) {
-        head_->SetNext(i, nullptr);
-    }
-}
-
-template <typename Key, class Comparator>
-void SkipList<Key, Comparator>::Insert(const Key& key, bool* overwritten) {
-    // TODO(opt): We can use a barrier-free variant of FindGreaterOrEqual()
-    // here since Insert() is externally synchronized.
-    Node* prev[kMaxHeight];
-    Node* x = FindGreaterOrEqual(key, prev);
-
-#ifndef BE_TEST
-    // The key already exists and duplicate keys are not allowed, so we need 
to aggregate them
-    if (!_can_dup && x != nullptr && Equal(key, x->key)) {
-        *overwritten = true;
-        return;
-    }
-#endif
-
-    *overwritten = false;
-    // Our data structure does not allow duplicate insertion
-    int height = RandomHeight();
-    if (height > GetMaxHeight()) {
-        for (int i = GetMaxHeight(); i < height; i++) {
-            prev[i] = head_;
-        }
-        //fprintf(stderr, "Change height from %d to %d\n", max_height_, 
height);
-
-        // It is ok to mutate max_height_ without any synchronization
-        // with concurrent readers.  A concurrent reader that observes
-        // the new value of max_height_ will see either the old value of
-        // new level pointers from head_ (nullptr), or a new value set in
-        // the loop below.  In the former case the reader will
-        // immediately drop to the next level since nullptr sorts after all
-        // keys.  In the latter case the reader will use the new node.
-        max_height_.store(height, std::memory_order_relaxed);
-    }
-
-    x = NewNode(key, height);
-    for (int i = 0; i < height; i++) {
-        // NoBarrier_SetNext() suffices since we will add a barrier when
-        // we publish a pointer to "x" in prev[i].
-        x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
-        prev[i]->SetNext(i, x);
-    }
-}
-
-// NOTE: Already be checked, the row is exist.
-template <typename Key, class Comparator>
-void SkipList<Key, Comparator>::InsertWithHint(const Key& key, bool is_exist, 
Hint* hint) {
-    Node* x = hint->curr;
-    DCHECK(!is_exist || x) << "curr pointer must not be null if row exists";
-
-#ifndef BE_TEST
-    // The key already exists and duplicate keys are not allowed, so we need 
to aggregate them
-    if (!_can_dup && is_exist) {
-        return;
-    }
-#endif
-
-    Node** prev = hint->prev;
-    // Our data structure does not allow duplicate insertion
-    int height = RandomHeight();
-    if (height > GetMaxHeight()) {
-        for (int i = GetMaxHeight(); i < height; i++) {
-            prev[i] = head_;
-        }
-        //fprintf(stderr, "Change height from %d to %d\n", max_height_, 
height);
-
-        // It is ok to mutate max_height_ without any synchronization
-        // with concurrent readers.  A concurrent reader that observes
-        // the new value of max_height_ will see either the old value of
-        // new level pointers from head_ (nullptr), or a new value set in
-        // the loop below.  In the former case the reader will
-        // immediately drop to the next level since nullptr sorts after all
-        // keys.  In the latter case the reader will use the new node.
-        max_height_.store(height, std::memory_order_relaxed);
-    }
-
-    x = NewNode(key, height);
-    for (int i = 0; i < height; i++) {
-        // NoBarrier_SetNext() suffices since we will add a barrier when
-        // we publish a pointer to "x" in prev[i].
-        x->NoBarrier_SetNext(i, prev[i]->NoBarrier_Next(i));
-        prev[i]->SetNext(i, x);
-    }
-}
-
-template <typename Key, class Comparator>
-bool SkipList<Key, Comparator>::Contains(const Key& key) const {
-    Node* x = FindGreaterOrEqual(key, nullptr);
-    if (x != nullptr && Equal(key, x->key)) {
-        return true;
-    } else {
-        return false;
-    }
-}
-
-template <typename Key, class Comparator>
-bool SkipList<Key, Comparator>::Find(const Key& key, Hint* hint) const {
-    Node* x = FindGreaterOrEqual(key, hint->prev);
-    hint->curr = x;
-    if (x != nullptr && Equal(key, x->key)) {
-        return true;
-    } else {
-        return false;
-    }
-}
-
-} // namespace doris
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 205eaa686b6..c40193cb0ea 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -313,6 +313,10 @@ void TaskScheduler::_do_work(size_t index) {
         auto status = Status::OK();
 
         try {
+            // This will enable exception handling logic in allocator.h when 
memory allocate
+            // failed or sysem memory is not sufficient.
+            doris::enable_thread_catch_bad_alloc++;
+            Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }};
             //TODO: use a better enclose to abstracting these
             if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) {
                 TUniqueId query_id = task->query_context()->query_id();
diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp 
b/be/src/runtime/routine_load/data_consumer_group.cpp
index 60e7c57a6c1..8d07b0ec81a 100644
--- a/be/src/runtime/routine_load/data_consumer_group.cpp
+++ b/be/src/runtime/routine_load/data_consumer_group.cpp
@@ -158,6 +158,8 @@ Status 
KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
         RdKafka::Message* msg;
         bool res = _queue.blocking_get(&msg);
         if (res) {
+            // conf has to be deleted finally
+            Defer delete_msg {[msg]() { delete msg; }};
             VLOG_NOTICE << "get kafka message"
                         << ", partition: " << msg->partition() << ", offset: " 
<< msg->offset()
                         << ", len: " << msg->len();
@@ -181,7 +183,6 @@ Status 
KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
                     }
                 }
             }
-            delete msg;
         } else {
             // queue is empty and shutdown
             eos = true;
diff --git a/be/test/olap/skiplist_test.cpp b/be/test/olap/skiplist_test.cpp
deleted file mode 100644
index 55c1b28bc53..00000000000
--- a/be/test/olap/skiplist_test.cpp
+++ /dev/null
@@ -1,423 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "olap/skiplist.h"
-
-#include <gtest/gtest-message.h>
-#include <gtest/gtest-test-part.h>
-#include <stdint.h>
-#include <stdio.h>
-
-#include <condition_variable>
-#include <functional>
-#include <iterator>
-#include <memory>
-#include <mutex>
-#include <set>
-#include <utility>
-
-#include "gtest/gtest_pred_impl.h"
-#include "testutil/test_util.h"
-#include "util/hash_util.hpp"
-#include "util/random.h"
-#include "util/work_thread_pool.hpp"
-#include "vec/common/arena.h"
-
-namespace doris {
-
-typedef uint64_t Key;
-const int random_seed = 301;
-
-struct TestComparator {
-    int operator()(const Key& a, const Key& b) const {
-        if (a < b) {
-            return -1;
-        } else if (a > b) {
-            return +1;
-        } else {
-            return 0;
-        }
-    }
-};
-
-class SkipTest : public testing::Test {};
-
-TEST_F(SkipTest, Empty) {
-    std::unique_ptr<vectorized::Arena> arena(new vectorized::Arena());
-
-    TestComparator* cmp = new TestComparator();
-    SkipList<Key, TestComparator> list(cmp, arena.get(), false);
-    EXPECT_TRUE(!list.Contains(10));
-
-    SkipList<Key, TestComparator>::Iterator iter(&list);
-    EXPECT_TRUE(!iter.Valid());
-    iter.SeekToFirst();
-    EXPECT_TRUE(!iter.Valid());
-    iter.Seek(100);
-    EXPECT_TRUE(!iter.Valid());
-    iter.SeekToLast();
-    EXPECT_TRUE(!iter.Valid());
-    delete cmp;
-}
-
-TEST_F(SkipTest, InsertAndLookup) {
-    std::unique_ptr<vectorized::Arena> arena(new vectorized::Arena());
-
-    const int N = 2000;
-    const int R = 5000;
-    Random rnd(1000);
-    std::set<Key> keys;
-    TestComparator* cmp = new TestComparator();
-    SkipList<Key, TestComparator> list(cmp, arena.get(), false);
-    for (int i = 0; i < N; i++) {
-        Key key = rnd.Next() % R;
-        if (keys.insert(key).second) {
-            bool overwritten = false;
-            list.Insert(key, &overwritten);
-        }
-    }
-
-    for (int i = 0; i < R; i++) {
-        if (list.Contains(i)) {
-            EXPECT_EQ(keys.count(i), 1);
-        } else {
-            EXPECT_EQ(keys.count(i), 0);
-        }
-    }
-
-    // Simple iterator tests
-    {
-        SkipList<Key, TestComparator>::Iterator iter(&list);
-        EXPECT_TRUE(!iter.Valid());
-
-        iter.Seek(0);
-        EXPECT_TRUE(iter.Valid());
-        EXPECT_EQ(*(keys.begin()), iter.key());
-
-        iter.SeekToFirst();
-        EXPECT_TRUE(iter.Valid());
-        EXPECT_EQ(*(keys.begin()), iter.key());
-
-        iter.SeekToLast();
-        EXPECT_TRUE(iter.Valid());
-        EXPECT_EQ(*(keys.rbegin()), iter.key());
-    }
-
-    // Forward iteration test
-    for (int i = 0; i < R; i++) {
-        SkipList<Key, TestComparator>::Iterator iter(&list);
-        iter.Seek(i);
-
-        // Compare against model iterator
-        std::set<Key>::iterator model_iter = keys.lower_bound(i);
-        for (int j = 0; j < 3; j++) {
-            if (model_iter == keys.end()) {
-                EXPECT_TRUE(!iter.Valid());
-                break;
-            } else {
-                EXPECT_TRUE(iter.Valid());
-                EXPECT_EQ(*model_iter, iter.key());
-                ++model_iter;
-                iter.Next();
-            }
-        }
-    }
-
-    // Backward iteration test
-    {
-        SkipList<Key, TestComparator>::Iterator iter(&list);
-        iter.SeekToLast();
-
-        // Compare against model iterator
-        for (std::set<Key>::reverse_iterator model_iter = keys.rbegin(); 
model_iter != keys.rend();
-             ++model_iter) {
-            EXPECT_TRUE(iter.Valid());
-            EXPECT_EQ(*model_iter, iter.key());
-            iter.Prev();
-        }
-        EXPECT_TRUE(!iter.Valid());
-    }
-    delete cmp;
-}
-
-// Only non-DUP model will use Find() and InsertWithHint().
-TEST_F(SkipTest, InsertWithHintNoneDupModel) {
-    std::unique_ptr<vectorized::Arena> arena(new vectorized::Arena());
-
-    const int N = 2000;
-    const int R = 5000;
-    Random rnd(1000);
-    std::set<Key> keys;
-    TestComparator* cmp = new TestComparator();
-    SkipList<Key, TestComparator> list(cmp, arena.get(), false);
-    SkipList<Key, TestComparator>::Hint hint;
-    for (int i = 0; i < N; i++) {
-        Key key = rnd.Next() % R;
-        bool is_exist = list.Find(key, &hint);
-        if (keys.insert(key).second) {
-            EXPECT_FALSE(is_exist);
-            list.InsertWithHint(key, is_exist, &hint);
-        } else {
-            EXPECT_TRUE(is_exist);
-        }
-    }
-
-    for (int i = 0; i < R; i++) {
-        if (list.Contains(i)) {
-            EXPECT_EQ(keys.count(i), 1);
-        } else {
-            EXPECT_EQ(keys.count(i), 0);
-        }
-    }
-    delete cmp;
-}
-
-// We want to make sure that with a single writer and multiple
-// concurrent readers (with no synchronization other than when a
-// reader's iterator is created), the reader always observes all the
-// data that was present in the skip list when the iterator was
-// constructor.  Because insertions are happening concurrently, we may
-// also observe new values that were inserted since the iterator was
-// constructed, but we should never miss any values that were present
-// at iterator construction time.
-//
-// We generate multi-part keys:
-//     <key,gen,hash>
-// where:
-//     key is in range [0..K-1]
-//     gen is a generation number for key
-//     hash is hash(key,gen)
-//
-// The insertion code picks a random key, sets gen to be 1 + the last
-// generation number inserted for that key, and sets hash to Hash(key,gen).
-//
-// At the beginning of a read, we snapshot the last inserted
-// generation number for each key.  We then iterate, including random
-// calls to Next() and Seek().  For every key we encounter, we
-// check that it is either expected given the initial snapshot or has
-// been concurrently added since the iterator started.
-class ConcurrentTest {
-private:
-    static const uint32_t K = 4;
-
-    static uint64_t key(Key key) { return (key >> 40); }
-    static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; }
-    static uint64_t hash(Key key) { return key & 0xff; }
-
-    static uint64_t hash_numbers(uint64_t k, uint64_t g) {
-        uint64_t data[2] = {k, g};
-        return HashUtil::hash(reinterpret_cast<char*>(data), sizeof(data), 0);
-    }
-
-    static Key make_key(uint64_t k, uint64_t g) {
-        EXPECT_EQ(sizeof(Key), sizeof(uint64_t));
-        EXPECT_LE(k, K); // We sometimes pass K to seek to the end of the 
skiplist
-        EXPECT_LE(g, 0xffffffffu);
-        return ((k << 40) | (g << 8) | (hash_numbers(k, g) & 0xff));
-    }
-
-    static bool is_valid_key(Key k) { return hash(k) == (hash_numbers(key(k), 
gen(k)) & 0xff); }
-
-    static Key random_target(Random* rnd) {
-        switch (rnd->Next() % 10) {
-        case 0:
-            // Seek to beginning
-            return make_key(0, 0);
-        case 1:
-            // Seek to end
-            return make_key(K, 0);
-        default:
-            // Seek to middle
-            return make_key(rnd->Next() % K, 0);
-        }
-    }
-
-    // Per-key generation
-    struct State {
-        std::atomic<int> generation[K];
-        void set(int k, int v) { generation[k].store(v, 
std::memory_order_release); }
-        int get(int k) { return generation[k].load(std::memory_order_acquire); 
}
-
-        State() {
-            for (int k = 0; k < K; k++) {
-                set(k, 0);
-            }
-        }
-    };
-
-    // Current state of the test
-    State _current;
-
-    std::unique_ptr<vectorized::Arena> _arena;
-    std::shared_ptr<TestComparator> _comparator;
-    // SkipList is not protected by _mu.  We just use a single writer
-    // thread to modify it.
-    SkipList<Key, TestComparator> _list;
-
-public:
-    ConcurrentTest()
-            : _arena(new vectorized::Arena()),
-              _comparator(new TestComparator()),
-              _list(_comparator.get(), _arena.get(), false) {}
-
-    // REQUIRES: External synchronization
-    void write_step(Random* rnd) {
-        const uint32_t k = rnd->Next() % K;
-        const int g = _current.get(k) + 1;
-        const Key new_key = make_key(k, g);
-        bool overwritten = false;
-        _list.Insert(new_key, &overwritten);
-        _current.set(k, g);
-    }
-
-    void read_step(Random* rnd) {
-        // Remember the initial committed state of the skiplist.
-        State initial_state;
-        for (int k = 0; k < K; k++) {
-            initial_state.set(k, _current.get(k));
-        }
-
-        Key pos = random_target(rnd);
-        SkipList<Key, TestComparator>::Iterator iter(&_list);
-        iter.Seek(pos);
-        while (true) {
-            Key current;
-            if (!iter.Valid()) {
-                current = make_key(K, 0);
-            } else {
-                current = iter.key();
-                EXPECT_TRUE(is_valid_key(current)) << current;
-            }
-            EXPECT_LE(pos, current) << "should not go backwards";
-
-            // Verify that everything in [pos,current) was not present in
-            // initial_state.
-            while (pos < current) {
-                EXPECT_LT(key(pos), K) << pos;
-
-                // Note that generation 0 is never inserted, so it is ok if
-                // <*,0,*> is missing.
-                EXPECT_TRUE((gen(pos) == 0) ||
-                            (gen(pos) > 
static_cast<Key>(initial_state.get(key(pos)))))
-                        << "key: " << key(pos) << "; gen: " << gen(pos)
-                        << "; initgen: " << initial_state.get(key(pos));
-
-                // Advance to next key in the valid key space
-                if (key(pos) < key(current)) {
-                    pos = make_key(key(pos) + 1, 0);
-                } else {
-                    pos = make_key(key(pos), gen(pos) + 1);
-                }
-            }
-
-            if (!iter.Valid()) {
-                break;
-            }
-
-            if (rnd->Next() % 2) {
-                iter.Next();
-                pos = make_key(key(pos), gen(pos) + 1);
-            } else {
-                Key new_target = random_target(rnd);
-                if (new_target > pos) {
-                    pos = new_target;
-                    iter.Seek(new_target);
-                }
-            }
-        }
-    }
-};
-const uint32_t ConcurrentTest::K;
-
-// Simple test that does single-threaded testing of the ConcurrentTest
-// scaffolding.
-TEST_F(SkipTest, ConcurrentWithoutThreads) {
-    ConcurrentTest test;
-    Random rnd(random_seed);
-    for (int i = 0; i < 10000; i++) {
-        test.read_step(&rnd);
-        test.write_step(&rnd);
-    }
-}
-
-class TestState {
-public:
-    ConcurrentTest _t;
-    int _seed;
-    std::atomic<bool> _quit_flag;
-
-    enum ReaderState { STARTING, RUNNING, DONE };
-
-    explicit TestState(int s) : _seed(s), _quit_flag(false), _state(STARTING) 
{}
-
-    void wait(ReaderState s) {
-        std::unique_lock l(_mu);
-        while (_state != s) {
-            _cv_state.wait(l);
-        }
-    }
-
-    void change(ReaderState s) {
-        std::lock_guard l(_mu);
-        _state = s;
-        _cv_state.notify_one();
-    }
-
-private:
-    std::mutex _mu;
-    ReaderState _state;
-    std::condition_variable _cv_state;
-};
-
-static void concurrent_reader(void* arg) {
-    TestState* state = reinterpret_cast<TestState*>(arg);
-    Random rnd(state->_seed);
-    state->change(TestState::RUNNING);
-    while (!state->_quit_flag.load(std::memory_order_acquire)) {
-        state->_t.read_step(&rnd);
-    }
-    state->change(TestState::DONE);
-}
-
-static void run_concurrent(int run) {
-    const int seed = random_seed + (run * 100);
-    Random rnd(seed);
-    const int N = LOOP_LESS_OR_MORE(10, 1000);
-    const int kSize = 1000;
-    PriorityThreadPool thread_pool(10, 100, "ut");
-    for (int i = 0; i < N; i++) {
-        if ((i % 100) == 0) {
-            fprintf(stderr, "Run %d of %d\n", i, N);
-        }
-        TestState state(seed + 1);
-        thread_pool.offer(std::bind<void>(concurrent_reader, &state));
-        state.wait(TestState::RUNNING);
-        for (int i = 0; i < kSize; i++) {
-            state._t.write_step(&rnd);
-        }
-        state._quit_flag.store(true, std::memory_order_release); // Any 
non-nullptr arg will do
-        state.wait(TestState::DONE);
-    }
-}
-
-TEST_F(SkipTest, Concurrent) {
-    for (int i = 1; i < LOOP_LESS_OR_MORE(2, 6); ++i) {
-        run_concurrent(i);
-    }
-}
-
-} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to