Hoernchen has uploaded this change for review. ( 
https://gerrit.osmocom.org/c/osmo-trx/+/32954 )


Change subject: ms: first exit fix using queue timeouts
......................................................................

ms: first exit fix using queue timeouts

Change-Id: I0b8deebc63cf4d936666fd68e1666d1917e89a5d
---
M Transceiver52M/ms/bladerf_specific.h
M Transceiver52M/ms/itrq.h
M Transceiver52M/ms/ms.cpp
M Transceiver52M/ms/ms.h
M Transceiver52M/ms/ms_rx_lower.cpp
M Transceiver52M/ms/ms_upper.cpp
M Transceiver52M/ms/ms_upper.h
7 files changed, 94 insertions(+), 39 deletions(-)



  git pull ssh://gerrit.osmocom.org:29418/osmo-trx refs/changes/54/32954/1

diff --git a/Transceiver52M/ms/bladerf_specific.h 
b/Transceiver52M/ms/bladerf_specific.h
index 3dc4777..e32d77c 100644
--- a/Transceiver52M/ms/bladerf_specific.h
+++ b/Transceiver52M/ms/bladerf_specific.h
@@ -192,7 +192,7 @@
        struct bladerf_stream *rx_stream;
        struct bladerf_stream *tx_stream;
        // using pkt2buf = blade_otw_buffer<2, blade_speed_buffer_type::SS>;
-       using tx_buf_q_type = spsc_cond<BLADE_NUM_BUFFERS, dev_buf_t *, true, 
false>;
+       using tx_buf_q_type = spsc_cond_timeout<BLADE_NUM_BUFFERS, dev_buf_t *, 
true, false>;
        const unsigned int rxFullScale, txFullScale;
        const int rxtxdelay;

diff --git a/Transceiver52M/ms/itrq.h b/Transceiver52M/ms/itrq.h
index 1d9e217..69ff515 100644
--- a/Transceiver52M/ms/itrq.h
+++ b/Transceiver52M/ms/itrq.h
@@ -29,7 +29,58 @@

 namespace spsc_detail
 {
-template <bool block_read, bool block_write> class spsc_cond_detail {
+template <bool block_read, bool block_write>
+class spsc_cond_timeout_detail {
+       std::condition_variable cond_r, cond_w;
+       std::mutex lr, lw;
+       std::atomic_int r_flag, w_flag;
+       const int timeout_ms = 200;
+
+    public:
+       explicit spsc_cond_timeout_detail() : r_flag(0), w_flag(0)
+       {
+       }
+
+       ~spsc_cond_timeout_detail()
+       {
+       }
+
+       ssize_t spsc_check_r()
+       {
+               std::unique_lock<std::mutex> lk(lr);
+               if (cond_r.wait_for(lk, std::chrono::milliseconds(timeout_ms), 
[&] { return r_flag != 0; })) {
+                       r_flag--;
+                       return 1;
+               } else {
+                       return 0;
+               }
+       }
+       ssize_t spsc_check_w()
+       {
+               std::unique_lock<std::mutex> lk(lw);
+               if (cond_w.wait_for(lk, std::chrono::milliseconds(timeout_ms), 
[&] { return w_flag != 0; })) {
+                       w_flag--;
+                       return 1;
+               } else {
+                       return 0;
+               }
+       }
+       void spsc_notify_r()
+       {
+               std::unique_lock<std::mutex> lk(lr);
+               r_flag++;
+               cond_r.notify_one();
+       }
+       void spsc_notify_w()
+       {
+               std::unique_lock<std::mutex> lk(lw);
+               w_flag++;
+               cond_w.notify_one();
+       }
+};
+
+template <bool block_read, bool block_write>
+class spsc_cond_detail {
        std::condition_variable cond_r, cond_w;
        std::mutex lr, lw;
        std::atomic_int r_flag, w_flag;
@@ -74,7 +125,8 @@
 };

 // originally designed for select loop integration
-template <bool block_read, bool block_write> class spsc_efd_detail {
+template <bool block_read, bool block_write>
+class spsc_efd_detail {
        int efd_r, efd_w; /* eventfds used to block/notify readers/writers */

     public:
@@ -191,4 +243,7 @@
 template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
 class spsc_evfd : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, 
spsc_detail::spsc_efd_detail> {};
 template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
-class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, 
spsc_detail::spsc_cond_detail> {};
\ No newline at end of file
+class spsc_cond : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, 
spsc_detail::spsc_cond_detail> {};
+template <unsigned int SZ, typename ELEM, bool block_read, bool block_write>
+class spsc_cond_timeout
+       : public spsc_detail::spsc<SZ, ELEM, block_read, block_write, 
spsc_detail::spsc_cond_timeout_detail> {};
\ No newline at end of file
diff --git a/Transceiver52M/ms/ms.cpp b/Transceiver52M/ms/ms.cpp
index 2e91cae..e587c05 100644
--- a/Transceiver52M/ms/ms.cpp
+++ b/Transceiver52M/ms/ms.cpp
@@ -78,7 +78,7 @@
        };
 }

-void ms_trx::start()
+void ms_trx::start_lower_ms()
 {
        if (stop_lower_threads_flag)
                return;
diff --git a/Transceiver52M/ms/ms.h b/Transceiver52M/ms/ms.h
index efccffc..8ca9b02 100644
--- a/Transceiver52M/ms/ms.h
+++ b/Transceiver52M/ms/ms.h
@@ -117,7 +117,7 @@
        };
 };

-using rx_queue_t = spsc_cond<8 * NUM_RXQ_FRAMES, one_burst, true, false>;
+using rx_queue_t = spsc_cond_timeout<8 * NUM_RXQ_FRAMES, one_burst, true, 
false>;

 enum class SCH_STATE { SEARCHING, FOUND };

@@ -267,7 +267,7 @@
        sched_params::target hw_target;
        single_thread_pool worker_thread;

-       void start();
+       void start_lower_ms();
        std::atomic<bool> upper_is_ready;
        void set_upper_ready(bool is_ready);

diff --git a/Transceiver52M/ms/ms_rx_lower.cpp 
b/Transceiver52M/ms/ms_rx_lower.cpp
index 4d6ce18..26ee131 100644
--- a/Transceiver52M/ms/ms_rx_lower.cpp
+++ b/Transceiver52M/ms/ms_rx_lower.cpp
@@ -142,10 +142,8 @@
                memcpy(brst.sch_bits, sch_demod_bits, sizeof(sch_demod_bits));
        }

-       if (upper_is_ready) { // this is blocking, so only submit if there is a 
reader - only if upper exists!
-               while (!rxqueue.spsc_push(&brst))
-                       ;
-       }
+       while (upper_is_ready && !rxqueue.spsc_push(&brst))
+               ;

        if (do_auto_gain)
                maybe_update_gain(brst);
diff --git a/Transceiver52M/ms/ms_upper.cpp b/Transceiver52M/ms/ms_upper.cpp
index a10d542..4b2f919 100644
--- a/Transceiver52M/ms/ms_upper.cpp
+++ b/Transceiver52M/ms/ms_upper.cpp
@@ -80,37 +80,16 @@
                while (!g_exit_flag) {
                        driveControl();
                }
-               std::cerr << "exit control!" << std::endl;
+               std::cerr << "exit U control!" << std::endl;
        });
-       msleep(1);
        thr_tx = std::thread([this] {
                set_name_aff_sched(sched_params::thread_names::U_TX);
                while (!g_exit_flag) {
                        driveTx();
                }
-               std::cerr << "exit tx U!" << std::endl;
+               std::cerr << "exit U tx!" << std::endl;
        });

-       // atomic ensures data is not written to q until loop reads
-       start_lower_ms();
-
-       set_name_aff_sched(sched_params::thread_names::U_RX);
-       while (!g_exit_flag) {
-               // set_upper_ready(true) needs to happen during cmd handling:
-               // the main loop is driven by rx, so unless rx is on AND 
transceiver is on we get stuck..
-               driveReceiveFIFO();
-               osmo_select_main(1);
-
-               trxcon_phyif_rsp r;
-               if (cmdq_from_phy.spsc_pop(&r)) {
-                       DBGLG() << "HAVE RESP:" << r.type << std::endl;
-                       trxcon_phyif_handle_rsp(g_trxcon, &r);
-               }
-       }
-       set_upper_ready(false);
-       std::cerr << "exit rx U!" << std::endl;
-       mOn = false;
-
 #ifdef LSANDEBUG
        std::thread([this] {
                set_name_aff_sched(sched_params::thread_names::LEAKCHECK);
@@ -123,9 +102,23 @@
 #endif
 }

-void upper_trx::start_lower_ms()
+void upper_trx::main_loop()
 {
-       ms_trx::start();
+       set_name_aff_sched(sched_params::thread_names::U_RX);
+       set_upper_ready(true);
+       while (!g_exit_flag) {
+               driveReceiveFIFO();
+               osmo_select_main(1);
+
+               trxcon_phyif_rsp r;
+               if (cmdq_from_phy.spsc_pop(&r)) {
+                       DBGLG() << "HAVE RESP:" << r.type << std::endl;
+                       trxcon_phyif_handle_rsp(g_trxcon, &r);
+               }
+       }
+       set_upper_ready(false);
+       std::cerr << "exit U rx!" << std::endl;
+       mOn = false;
 }

 // signalvector is owning despite claiming not to, but we can pretend, too..
@@ -346,7 +339,7 @@
        case TRXCON_PHYIF_CMDT_POWERON:
                if (!mOn) {
                        mOn = true;
-                       set_upper_ready(true);
+                       start_lower_ms();
                }
                break;
        case TRXCON_PHYIF_CMDT_POWEROFF:
@@ -430,7 +423,7 @@

        // blocking, will return when global exit is requested
        trx->start_threads();
-
+       trx->main_loop();
        trx->stop_threads();
        trx->stop_upper_threads();

diff --git a/Transceiver52M/ms/ms_upper.h b/Transceiver52M/ms/ms_upper.h
index bc9bd14..2362365 100644
--- a/Transceiver52M/ms/ms_upper.h
+++ b/Transceiver52M/ms/ms_upper.h
@@ -41,7 +41,7 @@

     public:
        void start_threads();
-       void start_lower_ms();
+       void main_loop();
        void stop_upper_threads();

        upper_trx(){};

-- 
To view, visit https://gerrit.osmocom.org/c/osmo-trx/+/32954
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.osmocom.org/settings

Gerrit-Project: osmo-trx
Gerrit-Branch: master
Gerrit-Change-Id: I0b8deebc63cf4d936666fd68e1666d1917e89a5d
Gerrit-Change-Number: 32954
Gerrit-PatchSet: 1
Gerrit-Owner: Hoernchen <[email protected]>
Gerrit-MessageType: newchange

Reply via email to