Hoernchen has submitted this change. ( 
https://gerrit.osmocom.org/c/osmo-trx/+/32946 )

Change subject: ms: use single thread pool
......................................................................

ms: use single thread pool

...so we don't spawn threads all the time.
Used for gain avg/setting.

Change-Id: Id675550f55e8ccbbbe6b0d91fbffd01b6ede15f7
---
M Transceiver52M/Makefile.am
M Transceiver52M/ms/ms.h
M Transceiver52M/ms/ms_rx_lower.cpp
A Transceiver52M/ms/threadpool.h
4 files changed, 134 insertions(+), 18 deletions(-)

Approvals:
  pespin: Looks good to me, but someone else must approve
  Jenkins Builder: Verified
  laforge: Looks good to me, but someone else must approve
  Hoernchen: Looks good to me, approved




diff --git a/Transceiver52M/Makefile.am b/Transceiver52M/Makefile.am
index 875e9a1..296ff05 100644
--- a/Transceiver52M/Makefile.am
+++ b/Transceiver52M/Makefile.am
@@ -97,6 +97,7 @@
        ms/ms_upper.h \
        ms/itrq.h \
        ms/sch.h \
+       ms/threadpool.h \
        grgsm_vitac/viterbi_detector.h \
        grgsm_vitac/constants.h \
        grgsm_vitac/grgsm_vitac.h
diff --git a/Transceiver52M/ms/ms.h b/Transceiver52M/ms/ms.h
index 7381397..d92f4b7 100644
--- a/Transceiver52M/ms/ms.h
+++ b/Transceiver52M/ms/ms.h
@@ -41,6 +41,7 @@
 #include "Complex.h"
 #include "GSMCommon.h"
 #include "itrq.h"
+#include "threadpool.h"

 const unsigned int ONE_TS_BURST_LEN = (3 + 58 + 26 + 58 + 3 + 8.25) * 4 
/*sps*/;
 const unsigned int NUM_RXQ_FRAMES = 1; // rx thread <-> upper rx queue
@@ -266,6 +267,7 @@
        time_keeper timekeeper;
        int hw_cpus;
        sched_params::target hw_target;
+       single_thread_pool worker_thread;

        void start();
        std::atomic<bool> upper_is_ready;
@@ -291,6 +293,7 @@
                  hw_target(hw_cpus > 4 ? sched_params::target::ODROID : 
sched_params::target::PI4)
        {
                std::cerr << "scheduling for: " << (hw_cpus > 4 ? "odroid" : 
"pi4") << std::endl;
+               set_name_aff_sched(worker_thread.get_handle(), 
sched_params::thread_names::SCH_SEARCH);
        }

        virtual ~ms_trx()
diff --git a/Transceiver52M/ms/ms_rx_lower.cpp 
b/Transceiver52M/ms/ms_rx_lower.cpp
index e8d8e0e..dc0d56d 100644
--- a/Transceiver52M/ms/ms_rx_lower.cpp
+++ b/Transceiver52M/ms/ms_rx_lower.cpp
@@ -30,6 +30,8 @@
 #include "ms.h"
 #include "grgsm_vitac/grgsm_vitac.h"

+#include "threadpool.h"
+
 extern "C" {
 #include "sch.h"
 }
@@ -126,8 +128,11 @@
                gainoffset = runmean < (rxFullScale / 2 ? 2 : 1);
                float newgain = runmean < rx_max_cutoff ? rxgain + gainoffset : 
rxgain - gainoffset;
                // FIXME: gian cutoff
-               if (newgain != rxgain && newgain <= 60)
-                       std::thread([this, newgain] { setRxGain(newgain); 
}).detach();
+               if (newgain != rxgain && newgain <= 60) {
+                       auto gain_fun = [this, newgain] { setRxGain(newgain); };
+                       worker_thread.add_task(gain_fun);
+               }
+
                runmean = 0;
        }
        gain_check = (gain_check + 1) % avgburst_num;
@@ -217,26 +222,36 @@
        return false;
 }

+/*
+accumulates a full big buffer consisting of 8*12 timeslots, then:
+either
+1) adjusts gain if necessary and starts over
+2) searches and finds SCH and is done
+*/
 SCH_STATE ms_trx::search_for_sch(dev_buf_t *rcd)
 {
        static unsigned int sch_pos = 0;
+       auto to_copy = SCH_LEN_SPS - sch_pos;
+
        if (sch_thread_done)
                return SCH_STATE::FOUND;

        if (rcv_done)
                return SCH_STATE::SEARCHING;

-       auto to_copy = SCH_LEN_SPS - sch_pos;
-
-       if (SCH_LEN_SPS == to_copy) // first time
+       if (sch_pos == 0) // keep first ts for time delta calc
                first_sch_buf_rcv_ts = rcd->get_first_ts();

-       if (!to_copy) {
+       if (to_copy) {
+               auto spsmax = rcd->actual_samples_per_buffer();
+               if (to_copy > (unsigned int)spsmax)
+                       sch_pos += rcd->readall(first_sch_buf + sch_pos);
+               else
+                       sch_pos += rcd->read_n(first_sch_buf + sch_pos, 0, 
to_copy);
+       } else { // (!to_copy)
                sch_pos = 0;
                rcv_done = true;
-               std::thread([this] {
-                       
set_name_aff_sched(sched_params::thread_names::SCH_SEARCH);
-
+               auto sch_search_fun = [this] {
                        auto ptr = reinterpret_cast<const int16_t 
*>(first_sch_buf);
                        const auto target_val = rxFullScale / 8;
                        float sum = 0;
@@ -255,16 +270,9 @@

                        if (!sch_thread_done)
                                rcv_done = false; // retry!
-                       return (bool)sch_thread_done;
-               }).detach();
+               };
+               worker_thread.add_task(sch_search_fun);
        }
-
-       auto spsmax = rcd->actual_samples_per_buffer();
-       if (to_copy > (unsigned int)spsmax)
-               sch_pos += rcd->readall(first_sch_buf + sch_pos);
-       else
-               sch_pos += rcd->read_n(first_sch_buf + sch_pos, 0, to_copy);
-
        return SCH_STATE::SEARCHING;
 }

diff --git a/Transceiver52M/ms/threadpool.h b/Transceiver52M/ms/threadpool.h
new file mode 100644
index 0000000..a5dec97
--- /dev/null
+++ b/Transceiver52M/ms/threadpool.h
@@ -0,0 +1,92 @@
+#pragma once
+/*
+ * (C) 2023 by sysmocom s.f.m.c. GmbH <[email protected]>
+ * All Rights Reserved
+ *
+ * Author: Eric Wild <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+#include <functional>
+#include <thread>
+#include <atomic>
+#include <vector>
+#include <future>
+#include <mutex>
+#include <queue>
+
+struct single_thread_pool {
+       std::mutex m;
+       std::condition_variable cv;
+       std::atomic<bool> stop_flag;
+       std::atomic<bool> is_ready;
+       std::deque<std::function<void()>> wq;
+       std::thread worker_thread;
+
+       template <class F>
+       void add_task(F &&f)
+       {
+               std::unique_lock<std::mutex> l(m);
+               wq.emplace_back(std::forward<F>(f));
+               cv.notify_one();
+               return;
+       }
+
+       single_thread_pool() : worker_thread(std::thread([this] { 
thread_loop(); }))
+       {
+       }
+       ~single_thread_pool()
+       {
+               stop();
+       }
+
+       std::thread::native_handle_type get_handle()
+       {
+               return worker_thread.native_handle();
+       }
+
+    private:
+       void stop()
+       {
+               {
+                       std::unique_lock<std::mutex> l(m);
+                       wq.clear();
+                       stop_flag = true;
+                       cv.notify_one();
+               }
+               worker_thread.join();
+       }
+
+       void thread_loop()
+       {
+               while (true) {
+                       is_ready = true;
+                       std::function<void()> f;
+                       {
+                               std::unique_lock<std::mutex> l(m);
+                               if (wq.empty()) {
+                                       cv.wait(l, [&] { return !wq.empty() || 
stop_flag; });
+                               }
+                               if (stop_flag)
+                                       return;
+                               is_ready = false;
+                               f = std::move(wq.front());
+                               wq.pop_front();
+                       }
+                       f();
+               }
+       }
+};
\ No newline at end of file

--
To view, visit https://gerrit.osmocom.org/c/osmo-trx/+/32946
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings

Gerrit-Project: osmo-trx
Gerrit-Branch: master
Gerrit-Change-Id: Id675550f55e8ccbbbe6b0d91fbffd01b6ede15f7
Gerrit-Change-Number: 32946
Gerrit-PatchSet: 4
Gerrit-Owner: Hoernchen <[email protected]>
Gerrit-Reviewer: Hoernchen <[email protected]>
Gerrit-Reviewer: Jenkins Builder
Gerrit-Reviewer: laforge <[email protected]>
Gerrit-Reviewer: pespin <[email protected]>
Gerrit-MessageType: merged

Reply via email to