Repository: incubator-hawq Updated Branches: refs/heads/master abf38b723 -> 62559633a
HAWQ-759. Fix query cannot be terminated by pg_cancel_backend or pg_terminate_backend Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/62559633 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/62559633 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/62559633 Branch: refs/heads/master Commit: 62559633a90fa427995819352c94a7d0f2513e0a Parents: abf38b7 Author: Wen Lin <[email protected]> Authored: Tue Jun 14 10:10:28 2016 +0800 Committer: Wen Lin <[email protected]> Committed: Tue Jun 14 10:10:28 2016 +0800 ---------------------------------------------------------------------- src/backend/libpq/be-secure.c | 8 ++ src/backend/libpq/pqcomm.c | 170 ++++++++++++++++++++++++++++++---- src/backend/tcop/postgres.c | 84 ++++++++++++++++- src/backend/utils/init/globals.c | 2 + src/include/libpq/pqcomm.h | 4 + src/include/miscadmin.h | 2 + src/include/tcop/tcopprot.h | 2 + 7 files changed, 250 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/libpq/be-secure.c ---------------------------------------------------------------------- diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index 155074d..06fb8d1 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -420,7 +420,11 @@ wloop: } else #endif + { + prepare_for_client_write(); n = send(port->sock, ptr, len, 0); + client_write_ended(); + } return n; } @@ -475,6 +479,8 @@ my_sock_write(BIO *h, const char *buf, int size) { int res = 0; + prepare_for_client_write(); + res = send(h->num, buf, size, 0); if (res <= 0) { @@ -484,6 +490,8 @@ my_sock_write(BIO *h, const char *buf, int size) } } + client_write_ended(); + return res; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/libpq/pqcomm.c ---------------------------------------------------------------------- diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index 3365558..dc00ee5 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -94,6 +94,7 @@ #include "storage/ipc.h" #include "utils/guc.h" #include "cdb/cdbvars.h" +#include "tcop/tcopprot.h" /* * Configuration options @@ -133,6 +134,8 @@ static bool DoingCopyOut; static void pq_close(int code, Datum arg); static int internal_putbytes(const char *s, size_t len); static int internal_flush(void); +static void pq_set_nonblocking(bool nonblocking); +static bool pq_send_mutex_lock(); #ifdef HAVE_UNIX_SOCKETS static int Lock_AF_UNIX(unsigned short portNumber, char *unixSocketName); @@ -752,6 +755,43 @@ TouchSocketFile(void) * -------------------------------- */ +/* -------------------------------- + * pq_set_nonblocking - set socket blocking/non-blocking + * + * Sets the socket non-blocking if nonblocking is TRUE, or sets it + * blocking otherwise. + * -------------------------------- + */ +static void +pq_set_nonblocking(bool nonblocking) +{ + if (MyProcPort->noblock == nonblocking) + return; + +#ifdef WIN32 + pgwin32_noblock = nonblocking ? 1 : 0; +#else + + /* + * Use COMMERROR on failure, because ERROR would try to send the error to + * the client, which might require changing the mode again, leading to + * infinite recursion. + */ + if (nonblocking) + { + if (!pg_set_noblock(MyProcPort->sock)) + ereport(COMMERROR, + (errmsg("could not set socket to non-blocking mode: %m"))); + } + else + { + if (!pg_set_block(MyProcPort->sock)) + ereport(COMMERROR, + (errmsg("could not set socket to blocking mode: %m"))); + } +#endif + MyProcPort->noblock = nonblocking; +} /* -------------------------------- * pq_recvbuf - load some bytes into the input buffer @@ -1191,6 +1231,77 @@ pq_getmessage(StringInfo s, int maxlen) return 0; } +/* + * Wrapper of simple pthread locking functionality, using pthread_mutex_trylock + * and loop to make it interruptible when waiting the lock; + * + * return true if successfuly acquires the lock, false if unable to get the lock + * and interrupted by SIGTERM, otherwise, infinitely loop to acquire the mutex. + * + * If we are going to return false, we close the socket to client; this is crucial + * for exiting dispatch thread if it is stuck on sending NOTICE to client, and hence + * avoid mutex deadlock; + * + * NOTE: should not call CHECK_FOR_INTERRUPTS and ereport in this routine, since + * it is in multi-thread context; + */ +static bool +pq_send_mutex_lock() +{ + int count = PQ_BUSY_TEST_COUNT_IN_EXITING; + int mutex_res; + + do + { + mutex_res = pthread_mutex_trylock(&send_mutex); + + if (mutex_res == 0) + { + return true; + } + + if (mutex_res == EBUSY) + { + /* No need to acquire lock for TermSignalReceived, since we are in + * a loop here */ + if (TermSignalReceived) + { + /* + * try PQ_BUSY_TEST_COUNT_IN_EXITING times before going to + * close the socket, in case real concurrent writing is in + * progress(compared to stuck send call in secure_write); + * + * It cannot help completely eliminate the false negative + * cases, but giving the process is exiting, it is acceptable + * to discard some messages, contrasted with the chance of + * infinite stuck; + */ + if (count-- < 0) + { + /* On Redhat and Suse, simple closing the socket would not get + * send() out of hanging state, shutdown() can do this(though not + * explicitly mentioned in manual page); however, if send over a + * socket which has been shutdown, process would be terminated by + * SIGPIPE; to avoid this race condition, we set the socket to be + * invalid before calling shutdown() + * + * On OSX, close() can get send() out of hanging state, while + * shutdown() would lead to SIGPIPE */ + int saved_fd = MyProcPort->sock; + MyProcPort->sock = -1; + whereToSendOutput = DestNone; +#ifndef __darwin__ + shutdown(saved_fd, SHUT_WR); +#endif + closesocket(saved_fd); + return false; + } + } + } + pg_usleep(1000L); + } while (true); +} + /* -------------------------------- * pq_putbytes - send bytes to connection (not flushed until pq_flush) @@ -1205,9 +1316,12 @@ pq_putbytes(const char *s, size_t len) /* Should only be called by old-style COPY OUT */ Assert(DoingCopyOut); - pthread_mutex_lock(&send_mutex); + if (!pq_send_mutex_lock()) + { + return EOF; + } res = internal_putbytes(s, len); - pthread_mutex_unlock(&send_mutex); + pthread_mutex_unlock(&send_mutex); return res; } @@ -1246,7 +1360,13 @@ pq_flush(void) /* No-op if reentrant call */ if ((Gp_role == GP_ROLE_DISPATCH) && IsUnderPostmaster) - pthread_mutex_lock(&send_mutex); + { + if (!pq_send_mutex_lock()) + { + return EOF; + } + } + pq_set_nonblocking(false); res = internal_flush(); if ((Gp_role == GP_ROLE_DISPATCH) && IsUnderPostmaster) pthread_mutex_unlock(&send_mutex); @@ -1287,6 +1407,7 @@ internal_flush(void) HOLD_INTERRUPTS(); + /* we can use ereport here, for the protection of send mutex */ ereport(COMMERROR, (errcode_for_socket_access(), errmsg("could not send data to client: %m"))); @@ -1340,9 +1461,7 @@ internal_flush(void) * * We also suppress messages generated while pqcomm.c is busy. This * avoids any possibility of messages being inserted within other - * messages. The only known trouble case arises if SIGQUIT occurs - * during a pqcomm.c routine --- quickdie() will try to send a warning - * message, and the most reasonable approach seems to be to drop it. + * messages. * * returns 0 if OK, EOF if trouble * -------------------------------- @@ -1350,21 +1469,25 @@ internal_flush(void) int pq_putmessage(char msgtype, const char *s, size_t len) { - int ret = EOF; + + if (DoingCopyOut) + { + return EOF; + } + if ((Gp_role == GP_ROLE_DISPATCH) && IsUnderPostmaster) - pthread_mutex_lock(&send_mutex); + { + if (!pq_send_mutex_lock()) + { + return EOF; + } + } - if (DoingCopyOut) - { - ret = 0; - goto fail; - } - if (msgtype) - { + { if (internal_putbytes(&msgtype, 1)) goto fail; - } + } if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { @@ -1397,9 +1520,13 @@ fail: void pq_startcopyout(void) { - pthread_mutex_lock(&send_mutex); + if (!pq_send_mutex_lock()) + { + /* no need to return a status, since socket has been closed in failed cases */ + return; + } DoingCopyOut = true; - pthread_mutex_unlock(&send_mutex); + pthread_mutex_unlock(&send_mutex); } /* -------------------------------- @@ -1420,9 +1547,12 @@ pq_endcopyout(bool errorAbort) if (errorAbort) pq_putbytes("\n\n\\.\n", 5); /* in non-error case, copy.c will have emitted the terminator line */ - pthread_mutex_lock(&send_mutex); + if (!pq_send_mutex_lock()) + { + return; + } DoingCopyOut = false; - pthread_mutex_unlock(&send_mutex); + pthread_mutex_unlock(&send_mutex); } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/tcop/postgres.c ---------------------------------------------------------------------- diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 00a1a70..5a8327e 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -214,6 +214,8 @@ static PreparedStatement *unnamed_stmt_pstmt = NULL; static bool EchoQuery = false; /* default don't echo */ +static bool DoingPqReading = false; /* in the middle of recv call of secure_read */ + extern pthread_t main_tid; #ifndef _WIN32 pthread_t main_tid = (pthread_t)0; @@ -599,6 +601,17 @@ ReadCommand(StringInfo inBuf) * non-reentrant libc functions. This restriction makes it safe for us * to allow interrupt service routines to execute nontrivial code while * we are waiting for input. + * + * When waiting in the main loop, we can process any interrupt immediately + * in the signal handler. In any other read from the client, like in a COPY + * FROM STDIN, we can't safely process a query cancel signal, because we might + * be in the middle of sending a message to the client, and jumping out would + * violate the protocol. Or rather, pqcomm.c would detect it and refuse to + * send any more messages to the client. But handling a SIGTERM is OK, because + * we're terminating the backend and don't need to send any more messages + * anyway. That means that we might not be able to send an error message to + * the client, but that seems better than waiting indefinitely, in case the + * client is not responding. */ void prepare_for_client_read(void) @@ -619,6 +632,18 @@ prepare_for_client_read(void) QueryCancelPending = false; CHECK_FOR_INTERRUPTS(); } + else + { + DoingPqReading = true; + /* Allow die interrupts to be processed while waiting */ + ImmediateDieOK = true; + + /* Process the ones that already arrived */ + if (ProcDiePending) + { + CHECK_FOR_INTERRUPTS(); + } + } } /* @@ -637,8 +662,48 @@ client_read_ended(void) DisableNotifyInterrupt(); DisableCatchupInterrupt(); } + else + { + ImmediateDieOK = false; + DoingPqReading = false; + } +} + +/* + * prepare_for_client_write -- set up to possibly block on client output + * + * Like prepare_for_client_read, but for writing. + * + * NOTE: this routine may be called in dispatch thread; + */ +void +prepare_for_client_write(void) +{ + /* Only enable this on main thread */ + if (pthread_equal(main_tid, pthread_self())) + { + /* Allow die interrupts to be processed while waiting */ + ImmediateDieOK = true; + + /* And don't forget to detect one that already arrived */ + if (ProcDiePending) + CHECK_FOR_INTERRUPTS(); + } } +/* + * client_read_ended -- get out of the client-output state + * + * This is called just after low-level writes. + */ +void +client_write_ended(void) +{ + if (pthread_equal(main_tid, pthread_self())) + { + ImmediateDieOK = false; + } +} /* * Parse a query string and pass it through the rewriter. @@ -3295,6 +3360,7 @@ die(SIGNAL_ARGS) { InterruptPending = true; ProcDiePending = true; + TermSignalReceived = true; /* although we don't strictly need to set this to true since the * ProcDiePending will occur first. We set this anyway since the @@ -3307,9 +3373,22 @@ die(SIGNAL_ARGS) * If it's safe to interrupt, and we're waiting for input or a lock, * service the interrupt immediately */ - if (ImmediateInterruptOK && InterruptHoldoffCount == 0 && - CritSectionCount == 0) + if ((ImmediateInterruptOK || ImmediateDieOK) && + InterruptHoldoffCount == 0 && CritSectionCount == 0) { + if (ImmediateDieOK && !DoingPqReading) + { + /* + * Getting here indicates that we have been interrupted during a + * data message is under sending to client, so close the connection + * immediately, since sending any more bytes may cause self dead + * lock(though we can handle this using pq_send_mutex_lock() now, it + * is better to avoid the unnecessary cost). + */ + close(MyProcPort->sock); + whereToSendOutput = DestNone; + } + /* bump holdoff count to make ProcessInterrupts() a no-op */ /* until we are done getting ready for it */ InterruptHoldoffCount++; @@ -3486,6 +3565,7 @@ ProcessInterrupts(void) ProcDiePending = false; QueryCancelPending = false; /* ProcDie trumps QueryCancel */ ImmediateInterruptOK = false; /* not idle anymore */ + ImmediateDieOK = false; /* prevent re-entry */ DisableNotifyInterrupt(); DisableCatchupInterrupt(); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/backend/utils/init/globals.c ---------------------------------------------------------------------- diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 842b2d1..a0220a6 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -33,6 +33,8 @@ volatile bool ProcDiePending = false; volatile bool ClientConnectionLost = false; volatile bool ImmediateInterruptOK = false; volatile bool InterruptWhenCallingPLUDF = false; +volatile bool ImmediateDieOK = false; +volatile bool TermSignalReceived = false; // Make these signed integers (instead of uint32) to detect garbage negative values. volatile int32 InterruptHoldoffCount = 0; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/include/libpq/pqcomm.h ---------------------------------------------------------------------- diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h index 54798fb..6fae8f0 100644 --- a/src/include/libpq/pqcomm.h +++ b/src/include/libpq/pqcomm.h @@ -201,4 +201,8 @@ typedef struct PrimaryMirrorTransitionPacket uint32 dataLength; } PrimaryMirrorTransitionPacket; +/* the number of times trying to acquire the send mutex for the front + * end connection after detecting process is exitting */ +#define PQ_BUSY_TEST_COUNT_IN_EXITING 5 + #endif /* PQCOMM_H */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/include/miscadmin.h ---------------------------------------------------------------------- diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 80c3a17..aabef90 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -80,6 +80,8 @@ extern volatile bool ClientConnectionLost; /* these are marked volatile because they are examined by signal handlers: */ extern volatile bool ImmediateInterruptOK; +extern volatile bool ImmediateDieOK; +extern volatile bool TermSignalReceived; extern volatile bool InterruptWhenCallingPLUDF; extern PGDLLIMPORT volatile int32 InterruptHoldoffCount; extern PGDLLIMPORT volatile int32 CritSectionCount; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/62559633/src/include/tcop/tcopprot.h ---------------------------------------------------------------------- diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index ad45a3a..5fc42b7 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -65,6 +65,8 @@ extern void StatementCancelHandler(SIGNAL_ARGS); extern void FloatExceptionHandler(SIGNAL_ARGS); extern void prepare_for_client_read(void); extern void client_read_ended(void); +extern void prepare_for_client_write(void); +extern void client_write_ended(void); extern int PostgresMain(int argc, char *argv[], const char *username); extern long get_stack_depth_rlimit(void); extern void ResetUsage(void);
