Repository: incubator-hawq
Updated Branches:
  refs/heads/master abf38b723 -> 62559633a


HAWQ-759. Fix query cannot be terminated by pg_cancel_backend or 
pg_terminate_backend


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/62559633
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/62559633
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/62559633

Branch: refs/heads/master
Commit: 62559633a90fa427995819352c94a7d0f2513e0a
Parents: abf38b7
Author: Wen Lin <[email protected]>
Authored: Tue Jun 14 10:10:28 2016 +0800
Committer: Wen Lin <[email protected]>
Committed: Tue Jun 14 10:10:28 2016 +0800

----------------------------------------------------------------------
 src/backend/libpq/be-secure.c    |   8 ++
 src/backend/libpq/pqcomm.c       | 170 ++++++++++++++++++++++++++++++----
 src/backend/tcop/postgres.c      |  84 ++++++++++++++++-
 src/backend/utils/init/globals.c |   2 +
 src/include/libpq/pqcomm.h       |   4 +
 src/include/miscadmin.h          |   2 +
 src/include/tcop/tcopprot.h      |   2 +
 7 files changed, 250 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/libpq/be-secure.c
----------------------------------------------------------------------
diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index 155074d..06fb8d1 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -420,7 +420,11 @@ wloop:
        }
        else
 #endif
+       {
+               prepare_for_client_write();
                n = send(port->sock, ptr, len, 0);
+               client_write_ended();
+       }
 
        return n;
 }
@@ -475,6 +479,8 @@ my_sock_write(BIO *h, const char *buf, int size)
 {
        int                     res = 0;
 
+       prepare_for_client_write();
+
        res = send(h->num, buf, size, 0);
        if (res <= 0)
        {
@@ -484,6 +490,8 @@ my_sock_write(BIO *h, const char *buf, int size)
                }
        }
 
+       client_write_ended();
+
        return res;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/libpq/pqcomm.c
----------------------------------------------------------------------
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 3365558..dc00ee5 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -94,6 +94,7 @@
 #include "storage/ipc.h"
 #include "utils/guc.h"
 #include "cdb/cdbvars.h"
+#include "tcop/tcopprot.h"
 
 /*
  * Configuration options
@@ -133,6 +134,8 @@ static bool DoingCopyOut;
 static void pq_close(int code, Datum arg);
 static int     internal_putbytes(const char *s, size_t len);
 static int     internal_flush(void);
+static void pq_set_nonblocking(bool nonblocking);
+static bool pq_send_mutex_lock();
 
 #ifdef HAVE_UNIX_SOCKETS
 static int     Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName);
@@ -752,6 +755,43 @@ TouchSocketFile(void)
  * --------------------------------
  */
 
+/* --------------------------------
+ *                       pq_set_nonblocking - set socket blocking/non-blocking
+ *
+ * Sets the socket non-blocking if nonblocking is TRUE, or sets it
+ * blocking otherwise.
+ * --------------------------------
+ */
+static void
+pq_set_nonblocking(bool nonblocking)
+{
+       if (MyProcPort->noblock == nonblocking)
+               return;
+
+#ifdef WIN32
+       pgwin32_noblock = nonblocking ? 1 : 0;
+#else
+
+       /*
+        * Use COMMERROR on failure, because ERROR would try to send the error 
to
+        * the client, which might require changing the mode again, leading to
+        * infinite recursion.
+        */
+       if (nonblocking)
+       {
+               if (!pg_set_noblock(MyProcPort->sock))
+                       ereport(COMMERROR,
+                                 (errmsg("could not set socket to non-blocking 
mode: %m")));
+       }
+       else
+       {
+               if (!pg_set_block(MyProcPort->sock))
+                       ereport(COMMERROR,
+                                       (errmsg("could not set socket to 
blocking mode: %m")));
+       }
+#endif
+       MyProcPort->noblock = nonblocking;
+}
 
 /* --------------------------------
  *             pq_recvbuf - load some bytes into the input buffer
@@ -1191,6 +1231,77 @@ pq_getmessage(StringInfo s, int maxlen)
        return 0;
 }
 
+/*
+ * Wrapper of simple pthread locking functionality, using pthread_mutex_trylock
+ * and loop to make it interruptible when waiting the lock;
+ *
+ * return true if successfuly acquires the lock, false if unable to get the 
lock
+ * and interrupted by SIGTERM, otherwise, infinitely loop to acquire the mutex.
+ *
+ * If we are going to return false, we close the socket to client; this is 
crucial
+ * for exiting dispatch thread if it is stuck on sending NOTICE to client, and 
hence
+ * avoid mutex deadlock;
+ *
+ * NOTE: should not call CHECK_FOR_INTERRUPTS and ereport in this routine, 
since
+ * it is in multi-thread context;
+ */
+static bool
+pq_send_mutex_lock()
+{
+       int count = PQ_BUSY_TEST_COUNT_IN_EXITING;
+       int mutex_res;
+
+       do
+       {
+               mutex_res = pthread_mutex_trylock(&send_mutex);
+
+               if (mutex_res == 0)
+               {
+                       return true;
+               }
+
+               if (mutex_res == EBUSY)
+               {
+                       /* No need to acquire lock for TermSignalReceived, 
since we are in
+                        * a loop here */
+                       if (TermSignalReceived)
+                       {
+                               /*
+                                * try PQ_BUSY_TEST_COUNT_IN_EXITING times 
before going to
+                                * close the socket, in case real concurrent 
writing is in
+                                * progress(compared to stuck send call in 
secure_write);
+                                *
+                                * It cannot help completely eliminate the 
false negative
+                                * cases, but giving the process is exiting, it 
is acceptable
+                                * to discard some messages, contrasted with 
the chance of
+                                * infinite stuck;
+                                */
+                               if (count-- < 0)
+                               {
+                                       /* On Redhat and Suse, simple closing 
the socket would not get
+                                        * send() out of hanging state, 
shutdown() can do this(though not
+                                        * explicitly mentioned in manual 
page); however, if send over a
+                                        * socket which has been shutdown, 
process would be terminated by
+                                        * SIGPIPE; to avoid this race 
condition, we set the socket to be
+                                        * invalid before calling shutdown()
+                                        *
+                                        * On OSX, close() can get send() out 
of hanging state, while
+                                        * shutdown() would lead to SIGPIPE */
+                                       int saved_fd = MyProcPort->sock;
+                                       MyProcPort->sock = -1;
+                                       whereToSendOutput = DestNone;
+#ifndef __darwin__
+                                       shutdown(saved_fd, SHUT_WR);
+#endif
+                                       closesocket(saved_fd);
+                                       return false;
+                               }
+                       }
+               }
+               pg_usleep(1000L);
+       } while (true);
+}
+
 
 /* --------------------------------
  *             pq_putbytes             - send bytes to connection (not flushed 
until pq_flush)
@@ -1205,9 +1316,12 @@ pq_putbytes(const char *s, size_t len)
 
        /* Should only be called by old-style COPY OUT */
        Assert(DoingCopyOut);
-    pthread_mutex_lock(&send_mutex);
+       if (!pq_send_mutex_lock())
+       {
+               return EOF;
+       }
        res = internal_putbytes(s, len);
-    pthread_mutex_unlock(&send_mutex);
+       pthread_mutex_unlock(&send_mutex);
        return res;
 }
 
@@ -1246,7 +1360,13 @@ pq_flush(void)
 
        /* No-op if reentrant call */
        if ((Gp_role == GP_ROLE_DISPATCH) && IsUnderPostmaster)
-               pthread_mutex_lock(&send_mutex);
+       {
+               if (!pq_send_mutex_lock())
+               {
+                       return EOF;
+               }
+       }
+       pq_set_nonblocking(false);
        res = internal_flush();
        if ((Gp_role == GP_ROLE_DISPATCH) && IsUnderPostmaster)
                pthread_mutex_unlock(&send_mutex);
@@ -1287,6 +1407,7 @@ internal_flush(void)
                                
                                HOLD_INTERRUPTS();
 
+                               /* we can use ereport here, for the protection 
of send mutex */
                                ereport(COMMERROR,
                                                (errcode_for_socket_access(),
                                                 errmsg("could not send data to 
client: %m")));
@@ -1340,9 +1461,7 @@ internal_flush(void)
  *
  *             We also suppress messages generated while pqcomm.c is busy.  
This
  *             avoids any possibility of messages being inserted within other
- *             messages.  The only known trouble case arises if SIGQUIT occurs
- *             during a pqcomm.c routine --- quickdie() will try to send a 
warning
- *             message, and the most reasonable approach seems to be to drop 
it.
+ *             messages.
  *
  *             returns 0 if OK, EOF if trouble
  * --------------------------------
@@ -1350,21 +1469,25 @@ internal_flush(void)
 int
 pq_putmessage(char msgtype, const char *s, size_t len)
 {
-    int ret = EOF;
+
+       if (DoingCopyOut)
+       {
+               return EOF;
+       }
+
        if ((Gp_role == GP_ROLE_DISPATCH) && IsUnderPostmaster)
-               pthread_mutex_lock(&send_mutex);
+       {
+               if (!pq_send_mutex_lock())
+               {
+                       return EOF;
+               }
+       }
 
-    if (DoingCopyOut)
-    {
-        ret = 0;
-        goto fail;
-    }
-        
        if (msgtype)
-    {
+       {
                if (internal_putbytes(&msgtype, 1))
                        goto fail;
-    }
+       }
 
        if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
        {
@@ -1397,9 +1520,13 @@ fail:
 void
 pq_startcopyout(void)
 {
-    pthread_mutex_lock(&send_mutex);
+       if (!pq_send_mutex_lock())
+       {
+               /* no need to return a status, since socket has been closed in 
failed cases */
+               return;
+       }
        DoingCopyOut = true;
-    pthread_mutex_unlock(&send_mutex);
+       pthread_mutex_unlock(&send_mutex);
 }
 
 /* --------------------------------
@@ -1420,9 +1547,12 @@ pq_endcopyout(bool errorAbort)
        if (errorAbort)
                pq_putbytes("\n\n\\.\n", 5);
        /* in non-error case, copy.c will have emitted the terminator line */
-    pthread_mutex_lock(&send_mutex);
+       if (!pq_send_mutex_lock())
+       {
+               return;
+       }
        DoingCopyOut = false;
-    pthread_mutex_unlock(&send_mutex);
+       pthread_mutex_unlock(&send_mutex);
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/tcop/postgres.c
----------------------------------------------------------------------
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 00a1a70..5a8327e 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -214,6 +214,8 @@ static PreparedStatement *unnamed_stmt_pstmt = NULL;
 
 static bool EchoQuery = false; /* default don't echo */
 
+static bool DoingPqReading = false; /* in the middle of recv call of 
secure_read */
+
 extern pthread_t main_tid;
 #ifndef _WIN32
 pthread_t main_tid = (pthread_t)0;
@@ -599,6 +601,17 @@ ReadCommand(StringInfo inBuf)
  * non-reentrant libc functions.  This restriction makes it safe for us
  * to allow interrupt service routines to execute nontrivial code while
  * we are waiting for input.
+ *
+ * When waiting in the main loop, we can process any interrupt immediately
+ * in the signal handler. In any other read from the client, like in a COPY
+ * FROM STDIN, we can't safely process a query cancel signal, because we might
+ * be in the middle of sending a message to the client, and jumping out would
+ * violate the protocol. Or rather, pqcomm.c would detect it and refuse to
+ * send any more messages to the client. But handling a SIGTERM is OK, because
+ * we're terminating the backend and don't need to send any more messages
+ * anyway. That means that we might not be able to send an error message to
+ * the client, but that seems better than waiting indefinitely, in case the
+ * client is not responding.
  */
 void
 prepare_for_client_read(void)
@@ -619,6 +632,18 @@ prepare_for_client_read(void)
                QueryCancelPending = false;
                CHECK_FOR_INTERRUPTS();
        }
+       else
+       {
+               DoingPqReading = true;
+               /* Allow die interrupts to be processed while waiting */
+               ImmediateDieOK = true;
+
+               /* Process the ones that already arrived */
+               if (ProcDiePending)
+               {
+                       CHECK_FOR_INTERRUPTS();
+               }
+       }
 }
 
 /*
@@ -637,8 +662,48 @@ client_read_ended(void)
                DisableNotifyInterrupt();
                DisableCatchupInterrupt();
        }
+       else
+       {
+               ImmediateDieOK = false;
+               DoingPqReading = false;
+       }
+}
+
+/*
+ * prepare_for_client_write -- set up to possibly block on client output
+ *
+ * Like prepare_for_client_read, but for writing.
+ *
+ * NOTE: this routine may be called in dispatch thread;
+ */
+void
+prepare_for_client_write(void)
+{
+       /* Only enable this on main thread */
+       if (pthread_equal(main_tid, pthread_self()))
+       {
+               /* Allow die interrupts to be processed while waiting */
+               ImmediateDieOK = true;
+
+               /* And don't forget to detect one that already arrived */
+               if (ProcDiePending)
+                       CHECK_FOR_INTERRUPTS();
+       }
 }
 
+/*
+ * client_read_ended -- get out of the client-output state
+ *
+ * This is called just after low-level writes.
+ */
+void
+client_write_ended(void)
+{
+       if (pthread_equal(main_tid, pthread_self()))
+       {
+               ImmediateDieOK = false;
+       }
+}
 
 /*
  * Parse a query string and pass it through the rewriter.
@@ -3295,6 +3360,7 @@ die(SIGNAL_ARGS)
        {
                InterruptPending = true;
                ProcDiePending = true;
+               TermSignalReceived = true;
 
                /* although we don't strictly need to set this to true since the
                 * ProcDiePending will occur first.  We set this anyway since 
the
@@ -3307,9 +3373,22 @@ die(SIGNAL_ARGS)
                 * If it's safe to interrupt, and we're waiting for input or a 
lock,
                 * service the interrupt immediately
                 */
-               if (ImmediateInterruptOK && InterruptHoldoffCount == 0 &&
-                       CritSectionCount == 0)
+               if ((ImmediateInterruptOK || ImmediateDieOK) &&
+                       InterruptHoldoffCount == 0 && CritSectionCount == 0)
                {
+                       if (ImmediateDieOK && !DoingPqReading)
+                       {
+                               /*
+                                * Getting here indicates that we have been 
interrupted during a
+                                * data message is under sending to client, so 
close the connection
+                                * immediately, since sending any more bytes 
may cause self dead
+                                * lock(though we can handle this using 
pq_send_mutex_lock() now, it
+                                * is better to avoid the unnecessary cost).
+                                */
+                               close(MyProcPort->sock);
+                               whereToSendOutput = DestNone;
+                       }
+
                        /* bump holdoff count to make ProcessInterrupts() a 
no-op */
                        /* until we are done getting ready for it */
                        InterruptHoldoffCount++;
@@ -3486,6 +3565,7 @@ ProcessInterrupts(void)
                ProcDiePending = false;
                QueryCancelPending = false;             /* ProcDie trumps 
QueryCancel */
                ImmediateInterruptOK = false;   /* not idle anymore */
+               ImmediateDieOK = false;         /* prevent re-entry */
                DisableNotifyInterrupt();
                DisableCatchupInterrupt();
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/utils/init/globals.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 842b2d1..a0220a6 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -33,6 +33,8 @@ volatile bool ProcDiePending = false;
 volatile bool ClientConnectionLost = false;
 volatile bool ImmediateInterruptOK = false;
 volatile bool InterruptWhenCallingPLUDF = false;
+volatile bool ImmediateDieOK = false;
+volatile bool TermSignalReceived = false;
 
 // Make these signed integers (instead of uint32) to detect garbage negative 
values.
 volatile int32 InterruptHoldoffCount = 0;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/include/libpq/pqcomm.h
----------------------------------------------------------------------
diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h
index 54798fb..6fae8f0 100644
--- a/src/include/libpq/pqcomm.h
+++ b/src/include/libpq/pqcomm.h
@@ -201,4 +201,8 @@ typedef struct PrimaryMirrorTransitionPacket
        uint32 dataLength;
 } PrimaryMirrorTransitionPacket;
 
+/* the number of times trying to acquire the send mutex for the front
+ * end connection after detecting process is exitting */
+#define PQ_BUSY_TEST_COUNT_IN_EXITING 5
+
 #endif   /* PQCOMM_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/include/miscadmin.h
----------------------------------------------------------------------
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 80c3a17..aabef90 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -80,6 +80,8 @@ extern volatile bool ClientConnectionLost;
 
 /* these are marked volatile because they are examined by signal handlers: */
 extern volatile bool ImmediateInterruptOK;
+extern volatile bool ImmediateDieOK;
+extern volatile bool TermSignalReceived;
 extern volatile bool InterruptWhenCallingPLUDF;
 extern PGDLLIMPORT volatile int32 InterruptHoldoffCount;
 extern PGDLLIMPORT volatile int32 CritSectionCount;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/include/tcop/tcopprot.h
----------------------------------------------------------------------
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index ad45a3a..5fc42b7 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -65,6 +65,8 @@ extern void StatementCancelHandler(SIGNAL_ARGS);
 extern void FloatExceptionHandler(SIGNAL_ARGS);
 extern void prepare_for_client_read(void);
 extern void client_read_ended(void);
+extern void prepare_for_client_write(void);
+extern void client_write_ended(void);
 extern int     PostgresMain(int argc, char *argv[], const char *username);
 extern long get_stack_depth_rlimit(void);
 extern void ResetUsage(void);

Reply via email to