Hi all,

in certain conditions it may be feasible also to have MO throughput control in terms of regulating how many messages we want to handle on the MO side (including DLRs).

For this purpose I have prepared the attached patchset, that introduces the optional splitting of the config directives:

  group = smsc
  throughput = X

would means X as MT and MO throughput constraint, and

  group = smsc
  throughput-mt = X
  throughput-mo = Y

where MT wise we have a X TPS constraint and MO wise a Y TPS.

So far only the SMSC HTTP and SMSC SMPP modules have been implementing in the patchset the MO throughput control. Other modules may follow if required.

Please review, comments as always welcome. If no objections, will commit next week.

(ONE argument that may be a decline reason is that the behavior WOULD change with current setups, i.e. for SMPP connections this would imply that we would also have MO throughput control for DLRs if the 'throughput = x' value is configured. A way to solve this is to keep 'throughput' for MT wise only, and introduce 'throughput-mo' as the new one for MO control. Comments?)

Thanks,
Stipe

--
Best Regards,
Stipe

-------------------------------------------------------------------
Koelner Landstrasse 419
40589 Düsseldorf, NRW, Germany

tolj.org system architecture      Kannel Software Foundation (KSF)
http://www.tolj.org/              http://www.kannel.org/

mailto:st_{at}_tolj.org           mailto:stolj_{at}_kannel.org
-------------------------------------------------------------------
Index: gw/smsc/smsc_at.c
===================================================================
--- gw/smsc/smsc_at.c   (revision 5324)
+++ gw/smsc/smsc_at.c   (working copy)
@@ -2215,9 +2215,9 @@
     if (privdata->modem->enable_mms && 
gw_prioqueue_len(privdata->outgoing_queue) > 1)                  
         at2_send_modem_command(privdata, "AT+CMMS=2", 0, 0);
 
-    if (privdata->conn->throughput > 0 && load_get(privdata->load, 0) >= 
privdata->conn->throughput) {
-      debug("bb.sms.at2", 0, "AT2[%s]: throughput limit exceeded (load: %.02f, 
throughput: %.02f)",
-            octstr_get_cstr(privdata->conn->id), load_get(privdata->load, 0), 
privdata->conn->throughput);
+    if (privdata->conn->throughput_mt > 0 && load_get(privdata->load, 0) >= 
privdata->conn->throughput_mt) {
+      debug("bb.sms.at2", 0, "AT2[%s]: MT throughput limit exceeded (load: 
%.02f, throughput: %.02f)",
+            octstr_get_cstr(privdata->conn->id), load_get(privdata->load, 0), 
privdata->conn->throughput_mt);
     } else {
       if ((msg = gw_prioqueue_remove(privdata->outgoing_queue))) {             
    
           load_increase(privdata->load);
Index: gw/smsc/smsc_emi.c
===================================================================
--- gw/smsc/smsc_emi.c  (revision 5324)
+++ gw/smsc/smsc_emi.c  (working copy)
@@ -1008,8 +1008,8 @@
     Msg *msg;
     double delay = 0;
 
-    if (conn->throughput > 0) {
-        delay = 1.0 / conn->throughput;
+    if (conn->throughput_mt > 0) {
+        delay = 1.0 / conn->throughput_mt;
     }
     
     /* Send messages if there's room in the sending window */
@@ -1017,7 +1017,7 @@
            (msg = gw_prioqueue_remove(PRIVDATA(conn)->outgoing_queue)) != 
NULL) {
         int nexttrn = emi2_next_trn(conn);
 
-        if (conn->throughput > 0)
+        if (conn->throughput_mt > 0)
             gwthread_sleep(delay);
 
         /* convert the generic Kannel message into an EMI type message */
Index: gw/smsc/smsc_fake.c
===================================================================
--- gw/smsc/smsc_fake.c (revision 5324)
+++ gw/smsc/smsc_fake.c (working copy)
@@ -268,8 +268,8 @@
     Msg        *msg;
     double delay = 0;
 
-    if (conn->throughput > 0) {
-        delay = 1.0 / conn->throughput;
+    if (conn->throughput_mt > 0) {
+        delay = 1.0 / conn->throughput_mt;
     }
 
     while (1) {
@@ -335,7 +335,7 @@
             }
 
             /* obey throughput speed limit, if any */
-            if (conn->throughput > 0) {
+            if (conn->throughput_mt > 0) {
                 gwthread_sleep(delay);
             }
         }
Index: gw/smsc/smsc_http.c
===================================================================
--- gw/smsc/smsc_http.c (revision 5324)
+++ gw/smsc/smsc_http.c (working copy)
@@ -165,7 +165,13 @@
     HTTPClient *client;
     Octstr *ip, *url, *body;
     List *headers, *cgivars;
+    double delay = 0;
 
+    /* throughput delay, if configured */
+    if (conn->throughput_mo > 0) {
+        delay = 1.0 / conn->throughput_mo;
+    }
+
     /* Make sure we log into our own log-file if defined */
     log_thread_to(conn->log_idx);
 
@@ -189,13 +195,19 @@
         debug("smsc.http", 0, "HTTP[%s]: Got request `%s'",
               octstr_get_cstr(conn->id), octstr_get_cstr(url));
 
+        /* obey throughput speed limit, if any */
+        if (conn->throughput_mo > 0) {
+            gwthread_sleep(delay);
+        }
+
         if (connect_denied(conndata->allow_ip, ip)) {
             info(0, "HTTP[%s]: Connection `%s' tried from denied "
                     "host %s, ignored", octstr_get_cstr(conn->id),
                     octstr_get_cstr(url), octstr_get_cstr(ip));
             http_close_client(client);
-        } else
+        } else {
             conndata->callbacks->receive_sms(conn, client, headers, body, 
cgivars);
+        }
 
         debug("smsc.http", 0, "HTTP[%s]: Destroying client information",
               octstr_get_cstr(conn->id));
@@ -249,8 +261,8 @@
     /* Make sure we log into our own log-file if defined */
     log_thread_to(conn->log_idx);
 
-    if (conn->throughput) {
-        delay = 1.0 / conn->throughput;
+    if (conn->throughput_mt) {
+        delay = 1.0 / conn->throughput_mt;
     }
 
     while (conndata->shutdown == 0) {
@@ -269,7 +281,7 @@
             break;
 
         /* obey throughput speed limit, if any */
-        if (conn->throughput > 0) {
+        if (conn->throughput_mt > 0) {
             gwthread_sleep(delay);
         }
         counter_increase(conndata->open_sends);
Index: gw/smsc/smsc_smasi.c
===================================================================
--- gw/smsc/smsc_smasi.c        (revision 5324)
+++ gw/smsc/smsc_smasi.c        (working copy)
@@ -872,8 +872,8 @@
 
     if (*pending_submits == -1) return;
 
-    if (smasi->conn->throughput > 0) {
-        delay = 1.0 / smasi->conn->throughput;
+    if (smasi->conn->throughput_mt > 0) {
+        delay = 1.0 / smasi->conn->throughput_mt;
     }
 
     while (*pending_submits < MAX_PENDING_SUBMITS) {
@@ -894,7 +894,7 @@
         smasi_pdu_destroy(pdu);
 
         /* obey throughput speed limit, if any */
-        if (smasi->conn->throughput > 0)
+        if (smasi->conn->throughput_mt > 0)
             gwthread_sleep(delay);
 
         ++(*pending_submits);
Index: gw/smsc/smsc_smpp.c
===================================================================
--- gw/smsc/smsc_smpp.c (revision 5324)
+++ gw/smsc/smsc_smpp.c (working copy)
@@ -184,7 +184,8 @@
     int wait_ack_action;
     int esm_class;
     long log_format;
-    Load *load;
+    Load *load_mt;
+    Load *load_mo;
     SMSCConn *conn;
 } SMPP;
 
@@ -284,8 +285,10 @@
     smpp->bind_addr_npi = 0;
     smpp->use_ssl = 0;
     smpp->ssl_client_certkey_file = NULL;
-    smpp->load = load_create_real(0);
-    load_add_interval(smpp->load, 1);
+    smpp->load_mt = load_create_real(0);
+    load_add_interval(smpp->load_mt, 1);
+    smpp->load_mo = load_create_real(0);
+    load_add_interval(smpp->load_mo, 1);
     smpp->esm_class = esm_class;
 
     return smpp;
@@ -309,7 +312,8 @@
         octstr_destroy(smpp->alt_charset);
         octstr_destroy(smpp->alt_addr_charset);
         octstr_destroy(smpp->ssl_client_certkey_file);
-        load_destroy(smpp->load);
+        load_destroy(smpp->load_mt);
+        load_destroy(smpp->load_mo);
         gw_free(smpp);
     }
 }
@@ -1264,13 +1268,13 @@
 
     while (*pending_submits < smpp->max_pending_submits) {
         /* check our throughput */
-        if (smpp->conn->throughput > 0 && load_get(smpp->load, 0) >= 
smpp->conn->throughput) {
-            debug("bb.sms.smpp", 0, "SMPP[%s]: throughput limit exceeded 
(%.02f,%.02f)",
-                  octstr_get_cstr(smpp->conn->id), load_get(smpp->load, 0), 
smpp->conn->throughput);
+        if (smpp->conn->throughput_mt > 0 && load_get(smpp->load_mt, 0) >= 
smpp->conn->throughput_mt) {
+            debug("bb.sms.smpp", 0, "SMPP[%s]: MT throughput limit exceeded 
(%.02f,%.02f)",
+                  octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mt, 0), 
smpp->conn->throughput_mt);
             break;
         }
-        debug("bb.sms.smpp", 0, "SMPP[%s]: throughput (%.02f,%.02f)",
-              octstr_get_cstr(smpp->conn->id), load_get(smpp->load, 0), 
smpp->conn->throughput);
+        debug("bb.sms.smpp", 0, "SMPP[%s]: MT throughput (%.02f,%.02f)",
+              octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mt, 0), 
smpp->conn->throughput_mt);
 
         /* Get next message, quit if none to be sent */
         msg = gw_prioqueue_remove(smpp->msgs_to_send);
@@ -1291,7 +1295,7 @@
             smpp_pdu_destroy(pdu);
             octstr_destroy(os);
             ++(*pending_submits);
-            load_increase(smpp->load);
+            load_increase(smpp->load_mt);
         }
         else { /* write error occurs */
             smpp_pdu_destroy(pdu);
@@ -1752,6 +1756,7 @@
                 return 0;
             }
             resp = smpp_pdu_create(data_sm_resp, 
pdu->u.data_sm.sequence_number);
+
             /*
              * If SMSCConn stopped then send temp. error code
              */
@@ -1762,6 +1767,18 @@
                 break;
             }
             mutex_unlock(smpp->conn->flow_mutex);
+
+            /* check our throughput */
+            if (smpp->conn->throughput_mo > 0 && load_get(smpp->load_mo, 0) >= 
smpp->conn->throughput_mo) {
+                debug("bb.sms.smpp", 0, "SMPP[%s]: MO throughput limit 
exceeded (%.02f,%.02f)",
+                      octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mo, 
0), smpp->conn->throughput_mo);
+                resp->u.data_sm_resp.command_status = SMPP_ESME_RX_T_APPN;
+                break;
+            }
+            debug("bb.sms.smpp", 0, "SMPP[%s]: MO throughput (%.02f,%.02f)",
+                  octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mo, 0), 
smpp->conn->throughput_mo);
+            load_increase(smpp->load_mo);
+
             /* got a deliver ack (DLR)?
              * NOTE: following SMPP v3.4. spec. we are interested
              *       only on bits 2-5 (some SMSC's send 0x44, and it's
@@ -1818,6 +1835,7 @@
                         octstr_get_cstr(smpp->conn->id), pdu->type_name);
                 return 0;
             }
+            resp = smpp_pdu_create(deliver_sm_resp, 
pdu->u.deliver_sm.sequence_number);
 
             /*
              * If SMSCConn stopped then send temp. error code
@@ -1825,13 +1843,22 @@
             mutex_lock(smpp->conn->flow_mutex);
             if (smpp->conn->is_stopped) {
                 mutex_unlock(smpp->conn->flow_mutex);
-                resp = smpp_pdu_create(deliver_sm_resp,
-                        pdu->u.deliver_sm.sequence_number);
                 resp->u.deliver_sm_resp.command_status = SMPP_ESME_RX_T_APPN;
                 break;
             }
             mutex_unlock(smpp->conn->flow_mutex);
 
+            /* check our throughput */
+            if (smpp->conn->throughput_mo > 0 && load_get(smpp->load_mo, 0) >= 
smpp->conn->throughput_mo) {
+                debug("bb.sms.smpp", 0, "SMPP[%s]: MO throughput limit 
exceeded (%.02f,%.02f)",
+                      octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mo, 
0), smpp->conn->throughput_mo);
+                resp->u.deliver_sm_resp.command_status = SMPP_ESME_RX_T_APPN;
+                break;
+            }
+            debug("bb.sms.smpp", 0, "SMPP[%s]: MO throughput (%.02f,%.02f)",
+                  octstr_get_cstr(smpp->conn->id), load_get(smpp->load_mo, 0), 
smpp->conn->throughput_mo);
+            load_increase(smpp->load_mo);
+
             /* 
              * Got a deliver ack (DLR)?
              * NOTE: following SMPP v3.4. spec. we are interested
@@ -2410,9 +2437,9 @@
                     smpp->throttling_err_time > 0 && pending_submits < 
smpp->max_pending_submits) {
                     time_t tr_timeout = smpp->throttling_err_time + 
SMPP_THROTTLING_SLEEP_TIME - now;
                     timeout = timeout > tr_timeout ? tr_timeout : timeout;
-                } else if (transmitter && gw_prioqueue_len(smpp->msgs_to_send) 
> 0 && smpp->conn->throughput > 0 &&
+                } else if (transmitter && gw_prioqueue_len(smpp->msgs_to_send) 
> 0 && smpp->conn->throughput_mt > 0 &&
                            smpp->max_pending_submits > pending_submits) {
-                    double t = 1.0 / smpp->conn->throughput;
+                    double t = 1.0 / smpp->conn->throughput_mt;
                     timeout = t < timeout ? t : timeout;
                 }
                 /* sleep a while */
Index: gw/smscconn.c
===================================================================
--- gw/smscconn.c       (revision 5324)
+++ gw/smscconn.c       (working copy)
@@ -336,11 +336,28 @@
             panic(0, "Could not compile pattern '%s'", 
octstr_get_cstr(preferred_prefix_regex));
 
     if ((tmp = cfg_get(grp, octstr_imm("throughput"))) != NULL) {
-        if (octstr_parse_double(&conn->throughput, tmp, 0) == -1)
-            conn->throughput = 0;
+        if (octstr_parse_double(&conn->throughput_mt, tmp, 0) == -1) {
+            conn->throughput_mt = conn->throughput_mo = 0;
+        } else {
+            conn->throughput_mo = conn->throughput_mt;
+        }
         octstr_destroy(tmp);
-        info(0, "Set throughput to %.3f for smsc id <%s>", conn->throughput, 
octstr_get_cstr(conn->id));
+        info(0, "Set MT/MO throughput to %.3f for smsc id <%s>", 
conn->throughput_mt, octstr_get_cstr(conn->id));
     }
+    else {
+        if ((tmp = cfg_get(grp, octstr_imm("throughput-mt"))) != NULL) {
+            if (octstr_parse_double(&conn->throughput_mt, tmp, 0) == -1)
+                conn->throughput_mt = 0;
+            octstr_destroy(tmp);
+            info(0, "Set MT throughput to %.3f for smsc id <%s>", 
conn->throughput_mt, octstr_get_cstr(conn->id));
+        }
+        if ((tmp = cfg_get(grp, octstr_imm("throughput-mo"))) != NULL) {
+             if (octstr_parse_double(&conn->throughput_mo, tmp, 0) == -1)
+                 conn->throughput_mo = 0;
+             octstr_destroy(tmp);
+             info(0, "Set MO throughput to %.3f for smsc id <%s>", 
conn->throughput_mo, octstr_get_cstr(conn->id));
+        }
+    }
     /* Sets the admin_id. Equals to connection id if empty */
     GET_OPTIONAL_VAL(conn->admin_id, "smsc-admin-id");
     if (conn->admin_id == NULL)
Index: gw/smscconn_p.h
===================================================================
--- gw/smscconn_p.h     (revision 5324)
+++ gw/smscconn_p.h     (working copy)
@@ -200,7 +200,8 @@
 
     int alt_dcs; /* use alternate DCS 0xFX */
 
-    double throughput;     /* message thoughput per sec. to be delivered to 
SMSC */
+    double throughput_mt;   /* message throughput per sec. to be delivered to 
SMSC */
+    double throughput_mo;   /* message throughput per sec. to be receiver from 
SMSC */
 
     /* Stores rerouting information for this specific smsc-id */
     int reroute;                /* simply turn MO into MT and process 
internally */
Index: doc/userguide/userguide.xml
===================================================================
--- doc/userguide/userguide.xml (revision 5324)
+++ doc/userguide/userguide.xml (working copy)
@@ -2807,13 +2807,30 @@
         recommended. The name is case-insensitive.
      </entry></row>
 
-    <row><entry><literal>throughput</literal></entry>
+    <row><entry><literal>throughput (r)</literal></entry>
       <entry><literal>float (messages/sec)</literal></entry>
       <entry valign="bottom">
         If SMSC requires that Kannel limits the number of messages per second, 
-        use this variable. This is considered as active throttling. (optional)
+        use this variable. This is considered as active throttling. It 
constraints
+        both sides, MT and MO with the same value. (optional)
      </entry></row>
 
+    <row><entry><literal>throughput-mt (r)</literal></entry>
+      <entry><literal>float (messages/sec)</literal></entry>
+      <entry valign="bottom">
+        If SMSC requires that Kannel limits the number of messages per second, 
+        use this variable. This is considered as active throttling. It 
constraints
+        only the MT side. (optional)
+     </entry></row>
+
+    <row><entry><literal>throughput-mo (r)</literal></entry>
+      <entry><literal>float (messages/sec)</literal></entry>
+      <entry valign="bottom">
+        If SMSC requires that Kannel limits the number of messages per second, 
+        use this variable. This is considered as active throttling. It 
constraints
+        only the MO side. (optional)
+     </entry></row>
+
    <row><entry><literal>denied-smsc-id</literal></entry>
      <entry><literal>id-list</literal></entry>
      <entry valign="bottom">
Index: gwlib/cfg.def
===================================================================
--- gwlib/cfg.def       (revision 5324)
+++ gwlib/cfg.def       (working copy)
@@ -330,6 +330,8 @@
     OCTSTR(our-host)
     OCTSTR(alt-dcs)
     OCTSTR(throughput)
+    OCTSTR(throughput-mt)
+    OCTSTR(throughput-mo)
     OCTSTR(dead-start)
     OCTSTR(alt-charset)
     OCTSTR(host)
Index: ChangeLog
===================================================================
--- ChangeLog   (revision 5324)
+++ ChangeLog   (working copy)
@@ -1,3 +1,13 @@
+2022-06-03 Stipe Tolj  <stolj at kannel.org>
+    * doc/userguide/userguide.xml: document 'throughput[-mt|-mo]' seperate 
values.
+    * gw/smscconn_p.h: use seperate values for MT/MO throughput configurations.
+    * gw/smscconn.c: handle config directives for seperate MT/MO throughput.
+    * gw/smsc/smsc_*.c: use throughput_mt for MT based throughput control.
+    * gw/smsc/smsc_[http|smpp].c: implement MO throughput control.
+    * gwlib/cfg.def: add 'throughput[-mt|-mo]' config directives.
+    This patchset introduces the concept of MO throughput control by own config
+    directives and implementations in SMSC types HTTP and SMPP.
+
 2022-05-19  Stipe Tolj  <stolj at kannel.org>
     * doc/userguide/userguide.xml: add comment about default smsbox-route.
     * gw/bb_boxc.c: allow setting a default 'group = smsbox-route' without

Reply via email to