I have also run into the problem with the throttling of output on the
smpp connections.

I worked out a bit of a hack that I will attach as a patch.
I don't like it very much since it pushes too much information down into
called routines.
But I believe that it will work for the most part.

There is also an unresolved issue of what to do when the incoming
messages start creating a huge queue.
What will happen when the queue builds? Will the input be limited by no
longer accepting messages or will it keep growing till things break?

One of my thoughts is that the send and receive functions need to have
there own thread each.
This would clean up some of the problems with handling timeouts like
this. The offset would be a more complex co-ordination task between the
threads.

On a somewhat different area.
The time calculations are done with a mixture of longs and doubles.
If all the times are converted to doubles then it would be possible to
co-ordinate events to less than a seconds timing.
I am not sure if this is worth the effort or not.

--
Alvin Starr                   ||   voice: (416)585-9971
Interlink Connectivity        ||   fax:   (416)585-9974
[EMAIL PROTECTED]              ||



--- gateway-1.4.1/gw/smsc/smsc_smpp.c	2006-06-13 09:50:49.000000000 -0400
+++ gateway-1.4.1-wip/gw/smsc/smsc_smpp.c	2007-05-21 19:55:18.000000000 -0400
@@ -980,20 +980,26 @@
 }
 
 
-static void send_messages(SMPP *smpp, Connection *conn, long *pending_submits)
+static void send_messages(SMPP *smpp, Connection *conn, long *pending_submits, long *last_sent)
 {
     Msg *msg;
     SMPP_PDU *pdu;
     Octstr *os;
     double delay = 0;
+    double nextenq;
+    long start_time;
 
     if (*pending_submits == -1)
         return;
 
+
     if (smpp->conn->throughput > 0) {
         delay = 1.0 / smpp->conn->throughput;
+    	if ((date_universal_now() - *last_sent) < delay)
+	    return;
     }
 
+
     while (*pending_submits < smpp->max_pending_submits) {
     	/* Get next message, quit if none to be sent */
     	msg = gwlist_extract_first(smpp->msgs_to_send);
@@ -1006,6 +1012,10 @@
             bb_smscconn_send_failed(smpp->conn, msg, SMSCCONN_FAILED_MALFORMED, octstr_create("MALFORMED SMS"));
             continue;
         }
+
+	start_time = date_universal_now();
+	*last_sent = start_time;
+
         /* check for write errors */
         if (send_pdu(conn, smpp->conn->id, pdu) == 0) {
             struct smpp_msg *smpp_msg = smpp_msg_create(msg);
@@ -1017,8 +1027,17 @@
             /*
              * obey throughput speed limit, if any.
              */
-            if (smpp->conn->throughput > 0)
-                gwthread_sleep(delay);
+            if (smpp->conn->throughput > 0) {
+		/*
+		 * exit if the next delay will run us over the time of the last enquire
+		 */
+		if ((*last_sent - start_time) + delay > smpp->enquire_link_interval) break;
+		/*
+		 * if we did not time out then likely there was an external event so we
+		 * should scan the loop again.
+		 */
+                if (gwthread_sleep(delay) != 0) break;
+	    }
         }
         else { /* write error occurs */
             smpp_pdu_destroy(pdu);
@@ -1728,10 +1747,12 @@
     Connection *conn;
     int ret;
     long last_enquire_sent;
+    long last_sent_time;
     long pending_submits;
     long len;
     SMPP_PDU *pdu;
     double timeout;
+    double throughput_timeout;
     time_t last_response, last_cleanup;
 
     io_arg = arg;
@@ -1751,13 +1772,20 @@
         else
             conn = open_receiver(smpp);
 
-        last_enquire_sent = last_cleanup = last_response = date_universal_now();
+        last_sent_time = last_enquire_sent = last_cleanup = last_response = date_universal_now();
         pending_submits = -1;
         len = 0;
         smpp->throttling_err_time = 0;
         for (;conn != NULL;) {
             timeout = last_enquire_sent + smpp->enquire_link_interval
                         - date_universal_now();
+            if (smpp->conn->throughput > 0) {
+		throughput_timeout = last_sent_time + (1.0/smpp->conn->throughput)
+                        - date_universal_now();
+		if (throughput_timeout < timeout ) timeout = throughput_timeout; 
+		if (timeout <= 0) timeout = 0;
+	    }
+
 
             if (conn_wait(conn, timeout) == -1)
                 break;
@@ -1809,7 +1837,7 @@
                 /* Make sure we send even if we read a lot */
                 if (transmitter && difftime(time(NULL), smpp->throttling_err_time) > SMPP_THROTTLING_SLEEP_TIME) {
                     smpp->throttling_err_time = 0;
-                    send_messages(smpp, conn, &pending_submits);
+                    send_messages(smpp, conn, &pending_submits,&last_sent_time);
                 }
             }
 
@@ -1837,7 +1865,7 @@
                                                                     
             if (transmitter && difftime(time(NULL), smpp->throttling_err_time) > SMPP_THROTTLING_SLEEP_TIME) {
                 smpp->throttling_err_time = 0;
-                send_messages(smpp, conn, &pending_submits);
+                send_messages(smpp, conn, &pending_submits,&last_sent_time);
             }
         }
 
--- gateway-1.4.1/gwlib/gwthread.h	2005-09-19 18:07:33.000000000 -0400
+++ gateway-1.4.1-wip/gwlib/gwthread.h	2007-05-21 14:44:39.000000000 -0400
@@ -139,8 +139,9 @@
 int gwthread_poll(struct pollfd *fds, long numfds, double timeout);
 
 /* Sleep until "seconds" seconds have elapsed, or until another thread
- * calls gwthread_wakeup on us.  Fractional seconds are allowed. */
-void gwthread_sleep(double seconds);
+ * calls gwthread_wakeup on us.  Fractional seconds are allowed. 
+ * 0 is returned if the sleep timed out -1 on error and +1 on external wakeup */
+int gwthread_sleep(double seconds);
 
 /* Force a specific thread to terminate. Returns 0 on success, -1 if the
  * thread has been terminated while calling and non-zero for the pthread
--- gateway-1.4.1/gwlib/gwthread-pthread.c	2006-05-23 12:28:57.000000000 -0400
+++ gateway-1.4.1-wip/gwlib/gwthread-pthread.c	2007-05-21 14:20:16.000000000 -0400
@@ -752,7 +752,7 @@
 }
 
 
-void gwthread_sleep(double seconds)
+int gwthread_sleep(double seconds)
 {
     struct pollfd pollfd;
     struct threadinfo *threadinfo;
@@ -777,6 +777,7 @@
     if (ret == 1) {
         flushpipe(pollfd.fd);
     }
+    return(ret);
 }
 
 

Reply via email to