Hello,

    The attached patch removes an ugly blocking sleep(3) call and resets
UDS socket before retrying after a failed send attempt. Seems to work in
my limited tests.

This change addresses two XXXs in the UDS code, but does not fix the kid
registration problem yet. More patches to follow.


Thank you,

Alex.
Replace blocking sleep(3) and close UDS socket on failures.

The two addressed XXX were not causing any serious bugs on their own,
but the blocking sleep was ugly and possibly in the way of further
kid registration fixes/improvements.

=== modified file 'src/ipc/UdsOp.cc'
--- src/ipc/UdsOp.cc	2013-06-03 14:05:16 +0000
+++ src/ipc/UdsOp.cc	2013-11-06 23:52:01 +0000
@@ -64,75 +64,131 @@ void Ipc::UdsOp::noteTimeout(const CommT
     timedout(); // our kid handles communication timeout
 }
 
 struct sockaddr_un
 Ipc::PathToAddress(const String& pathAddr) {
     assert(pathAddr.size() != 0);
     struct sockaddr_un unixAddr;
     memset(&unixAddr, 0, sizeof(unixAddr));
     unixAddr.sun_family = AF_LOCAL;
     xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path));
     return unixAddr;
 }
 
 CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender);
 
 Ipc::UdsSender::UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage):
         UdsOp(pathAddr),
         message(aMessage),
         retries(10), // TODO: make configurable?
         timeout(10), // TODO: make configurable?
+        sleeping(false),
         writing(false)
 {
     message.address(address);
 }
 
+void Ipc::UdsSender::swanSong()
+{
+    // did we abort while waiting between retries?
+    if (sleeping)
+        cancelSleep();
+
+    UdsOp::swanSong();
+}
+
+
 void Ipc::UdsSender::start()
 {
     UdsOp::start();
     write();
     if (timeout > 0)
         setTimeout(timeout, "Ipc::UdsSender::noteTimeout");
 }
 
 bool Ipc::UdsSender::doneAll() const
 {
-    return !writing && UdsOp::doneAll();
+    return !writing && !sleeping && UdsOp::doneAll();
 }
 
 void Ipc::UdsSender::write()
 {
     debugs(54, 5, HERE);
     typedef CommCbMemFunT<UdsSender, CommIoCbParams> Dialer;
     AsyncCall::Pointer writeHandler = JobCallback(54, 5,
                                       Dialer, this, UdsSender::wrote);
     Comm::Write(conn(), message.raw(), message.size(), writeHandler, NULL);
     writing = true;
 }
 
 void Ipc::UdsSender::wrote(const CommIoCbParams& params)
 {
     debugs(54, 5, HERE << params.conn << " flag " << params.flag << " retries " << retries << " [" << this << ']');
     writing = false;
     if (params.flag != COMM_OK && retries-- > 0) {
-        sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed?
-        write(); // XXX: should we close on error so that conn() reopens?
+        // perhaps a fresh connection and more time will help?
+        conn()->close();
+        sleep();
+    }
+}
+
+/// pause for a while before resending the message
+void Ipc::UdsSender::sleep()
+{
+    Must(!sleeping);
+    sleeping = true;
+    eventAdd("Ipc::UdsSender::DelayedRetry",
+             Ipc::UdsSender::DelayedRetry,
+             new Pointer(this), 1, 0, false); // TODO: Use Fibonacci increments
+}
+
+/// stop sleeping (or do nothing if we were not)
+void Ipc::UdsSender::cancelSleep()
+{
+    if (sleeping) {
+        // Why not delete the event? See Comm::ConnOpener::cancelSleep().
+        sleeping = false;
+        debugs(54, 9, "stops sleeping");
+    }
+}
+
+/// legacy wrapper for Ipc::UdsSender::delayedRetry()
+void Ipc::UdsSender::DelayedRetry(void *data)
+{
+    Pointer *ptr = static_cast<Pointer*>(data);
+    assert(ptr);
+    if (UdsSender *us = dynamic_cast<UdsSender*>(ptr->valid())) {
+        // get back inside AsyncJob protection by scheduling an async job call
+        typedef NullaryMemFunT<Ipc::UdsSender> Dialer;
+        AsyncCall::Pointer call = JobCallback(54, 4, Dialer, us, Ipc::UdsSender::delayedRetry);
+        ScheduleCallHere(call);
+    }
+    delete ptr;
+}
+
+/// make another sending attempt after a pause
+void Ipc::UdsSender::delayedRetry()
+{
+    debugs(54, 5, HERE << sleeping);
+    if (sleeping) {
+        sleeping = false;
+        write(); // reopens the connection if needed
     }
 }
 
 void Ipc::UdsSender::timedout()
 {
     debugs(54, 5, HERE);
     mustStop("timedout");
 }
 
 void Ipc::SendMessage(const String& toAddress, const TypedMsgHdr &message)
 {
     AsyncJob::Start(new UdsSender(toAddress, message));
 }
 
 const Comm::ConnectionPointer &
 Ipc::ImportFdIntoComm(const Comm::ConnectionPointer &conn, int socktype, int protocol, Ipc::FdNoteId noteId)
 {
     struct sockaddr_in addr;
     socklen_t len = sizeof(addr);
     if (getsockname(conn->fd, reinterpret_cast<sockaddr*>(&addr), &len) == 0) {

=== modified file 'src/ipc/UdsOp.h'
--- src/ipc/UdsOp.h	2012-10-04 09:14:06 +0000
+++ src/ipc/UdsOp.h	2013-11-06 22:29:36 +0000
@@ -48,48 +48,55 @@ private:
 private:
     int options; ///< UDS options
     Comm::ConnectionPointer conn_; ///< UDS descriptor
 
 private:
     UdsOp(const UdsOp &); // not implemented
     UdsOp &operator= (const UdsOp &); // not implemented
 };
 
 /// converts human-readable filename path into UDS address
 struct sockaddr_un PathToAddress(const String &pathAddr);
 
 // XXX: move UdsSender code to UdsSender.{cc,h}
 /// attempts to send an IPC message a few times, with a timeout
 class UdsSender: public UdsOp
 {
 public:
     UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage);
 
 protected:
+    virtual void swanSong(); // UdsOp (AsyncJob) API
     virtual void start(); // UdsOp (AsyncJob) API
     virtual bool doneAll() const; // UdsOp (AsyncJob) API
     virtual void timedout(); // UdsOp API
 
 private:
+    void sleep();
+    void cancelSleep();
+    static void DelayedRetry(void *data);
+    void delayedRetry();
+
     void write(); ///< schedule writing
     void wrote(const CommIoCbParams& params); ///< done writing or error
 
 private:
     TypedMsgHdr message; ///< what to send
     int retries; ///< how many times to try after a write error
     int timeout; ///< total time to send the message
+    bool sleeping; ///< whether we are waiting to retry a failed write
     bool writing; ///< whether Comm started and did not finish writing
 
 private:
     UdsSender(const UdsSender&); // not implemented
     UdsSender& operator= (const UdsSender&); // not implemented
 
     CBDATA_CLASS2(UdsSender);
 };
 
 void SendMessage(const String& toAddress, const TypedMsgHdr& message);
 /// import socket fd from another strand into our Comm state
 const Comm::ConnectionPointer & ImportFdIntoComm(const Comm::ConnectionPointer &conn, int socktype, int protocol, FdNoteId noteId);
 
 }
 
 #endif /* SQUID_IPC_ASYNCUDSOP_H */

Reply via email to