Repository: incubator-hawq Updated Branches: refs/heads/master 492e8b71c -> 2a077062d
HAWQ-1208. Porting gpdb interconnect fix to hawq This commit includes below commits in gpdb: ------------------------------------------ commit ce9cf07073946644b9dbc9a2fa0c1ca1179c57ee Author: xiongg1 <[email protected]> Date: Wed Jun 10 19:31:05 2015 -0400 [JIRA: MPP-25589] interconnect timeout bug commit 615934aa7cf87eab9b258cb56eae7766aabab979 Author: tangp4 <[email protected]> Date: Tue Jul 7 22:00:24 2015 -0400 [JIRA: MPP-25497] Fix the coverity defect CIDs: 16672 commit ae75ad81020533e0acc69daba086581dbca68ed2 Author: TangPengzhou <[email protected]> Date: Thu May 21 17:36:30 2015 +0800 [JIRA: MPP-25497] Shutdown interconnect thread before shmem_exit() Sometimes, if a FATAL level error occurs, shmem_exit() freed contexts that still being accessed by ic thread, this cause a segment fault unexpectedly. commit 2d7d0735f5b862d62a84d446f6dd2ce44b0df250 Author: Gang Xiong <[email protected]> Date: Sun Jan 3 13:37:34 2016 -0500 [JIRA:MPP-26123] Handle corner case of interconnect ack packet When the ack packet is resent with UDPIC_FLAGS_STOP set, we need to make sure the sender is aware of that. Otherwise, the sender will hang there forever. commit ad8328423661cb64e7b905df4928e48d532653b9 Author: Heikki Linnakangas <[email protected]> Date: Sun Aug 30 00:11:17 2015 +0300 [MPP-25631] Remove unnecessary #includes. No particular urgency to clean up just these, just something that caught my eye while browsing the code. commit 65809c0bf53a665a16e467331ea6e7d65aee54c6 Author: xiongg1 <[email protected]> Date: Mon Jun 15 19:36:55 2015 -0400 [JIRA: MPP-25590] Wrong error message when socket is exhausted on master commit a343ed5bfd992b10137ad232f241be72857d6ea2 Author: gpadmin <gpadmin@g187> Date: Thu Oct 27 18:58:54 2016 +0000 Add GUC called gp_interconnect_tcp_listener_backlog for tcp interconnect to control the backlog param of listen call commit eda343b5050e31764154698f25e88d6f3fa7e957 Author: Kenan Yao <[email protected]> Date: Tue Oct 25 16:10:05 2016 +0800 Fix a bug in function destroyConnHashTable which frees a wrong pointer and should cause SIGSEGV. Signed-off-by: Pengzhou Tang <[email protected]> commit 616f3c8f0ea372e01d166f4a6c52c6075a74ecd3 Author: Pengzhou Tang <[email protected]> Date: Tue Apr 5 10:50:55 2016 +0800 Fix coverity issue for 7e8f391dfdd commit 7e8f391dfdd8945573d8b621533626813f8f7684 Author: tangpengzhou <[email protected]> Date: Mon Mar 28 23:47:48 2016 +0000 Fix tcp socket/port leak when interconnect type is udp or udpifc Since gpdb don't allow changing interconnect type after connection started, it's unnecessary to allocate a tcp socket and port for hot switching from udp/udpifc to tcp. commit 7e8f391dfdd8945573d8b621533626813f8f7684 Author: tangpengzhou <[email protected]> Date: Mon Mar 28 23:47:48 2016 +0000 Fix tcp socket/port leak when interconnect type is udp or udpifc Since gpdb don't allow changing interconnect type after connection started, it's unnecessary to allocate a tcp socket and port for hot switching from udp/udpifc to tcp. commit 8ec73f12c997777cae321c301a079f2cae2914fc Author: Pengzhou Tang <[email protected]> Date: Thu Mar 3 15:36:06 2016 +0800 Fix incorrect EOS warning message generated by direct-dispatch type queries QD should not expect end-of-stream comes from QEs who is not members of direct dispatch and should not report warning message. commit 21973ab2282701eff45f9b9448525da266843ca4 Author: Pengzhou Tang <[email protected]> Date: Thu Mar 24 16:16:11 2016 +0800 Fix ic thread waiting error in utility mode Interconnect thread is not created in utility mode, ic_rx_thread_created is initialized to true which cause WaitInterconnectQuitUDP() to wait a non-existent ic thread. commit ba4b8ab4bce0cb62e7b1d3124012a20849434d81 Author: xiong-gang <[email protected]> Date: Fri Nov 18 17:10:41 2016 +0800 Correct 'extraSeq' in ack packet after stop is requested If the ack packet in doSendStopMessageUDPIFC() is lost, QE will keep sending status packet, and QD will ack it in handleDataPacket(). We need make sure the 'extraSeq' is equal to 'seq' in the ack packet so that QE can update the capacity. Or else, QE will hang for ever. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/2a077062 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/2a077062 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/2a077062 Branch: refs/heads/master Commit: 2a077062d30892d7c6372d1e7995234ae0aa95d5 Parents: 492e8b7 Author: Ming LI <[email protected]> Authored: Fri Dec 9 19:35:12 2016 +0800 Committer: ivan <[email protected]> Committed: Tue Dec 27 14:26:35 2016 +0800 ---------------------------------------------------------------------- src/backend/cdb/motion/cdbmotion.c | 7 ++ src/backend/cdb/motion/ic_common.c | 124 +++++++++++++++++++- src/backend/cdb/motion/ic_tcp.c | 8 ++ src/backend/cdb/motion/ic_udp.c | 75 +++++++++++- src/backend/parser/analyze.c | 5 +- src/backend/storage/ipc/ipc.c | 40 ++++++- src/backend/utils/misc/faultinjector.c | 5 +- src/include/cdb/ml_ipc.h | 11 ++ src/include/utils/faultinjector.h | 2 + src/test/regress/expected/icudp_full.out | 17 +++ src/test/regress/sql/icudp_full.sql | 16 +++ tools/bin/hawqpylib/programs/clsInjectFault.py | 1 + 12 files changed, 295 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/cdb/motion/cdbmotion.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/motion/cdbmotion.c b/src/backend/cdb/motion/cdbmotion.c index 03a9633..a237703 100644 --- a/src/backend/cdb/motion/cdbmotion.c +++ b/src/backend/cdb/motion/cdbmotion.c @@ -750,6 +750,13 @@ EndMotionLayerNode(MotionLayerState *mlStates, int16 motNodeID, bool flushCommLa { pCSEntry = &pMNEntry->ready_tuple_lists[i]; + /* + * QD should not expect end-of-stream comes from QEs who is not members of + * direct dispatch + */ + if (!pCSEntry->init) + continue; + if (pMNEntry->preserve_order && gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/cdb/motion/ic_common.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/motion/ic_common.c b/src/backend/cdb/motion/ic_common.c index 6484a11..0961a25 100644 --- a/src/backend/cdb/motion/ic_common.c +++ b/src/backend/cdb/motion/ic_common.c @@ -36,13 +36,10 @@ #include "postgres.h" -#include <pthread.h> - #include "nodes/execnodes.h" /* Slice, SliceTable */ #include "nodes/pg_list.h" #include "nodes/print.h" #include "utils/memutils.h" -#include "utils/hsearch.h" #include "miscadmin.h" #include "libpq/libpq-be.h" #include "libpq/ip.h" @@ -322,8 +319,10 @@ InitMotionLayerIPC(void) /*activated = false;*/ savedSeqServerFd = -1; - InitMotionTCP(&TCP_listenerFd, &tcp_listener); - InitMotionUDP(&UDP_listenerFd, &udp_listener); + if (Gp_interconnect_type == INTERCONNECT_TYPE_TCP) + InitMotionTCP(&TCP_listenerFd, &tcp_listener); + else if (Gp_interconnect_type == INTERCONNECT_TYPE_UDP) + InitMotionUDP(&UDP_listenerFd, &udp_listener); Gp_listener_port = (udp_listener<<16) | tcp_listener; @@ -1020,3 +1019,118 @@ void adjustMasterRouting(Slice *recvSlice) } } } + +void +SendDummyPacket(void) +{ + int sockfd = -1; + int ret = -1; + struct addrinfo* addrs = NULL; + struct addrinfo* rp = NULL; + struct addrinfo hint; + uint16 udp_listenner; + char port_str[32] = {0}; + char* dummy_pkt = "stop it"; + /* + * Get address info from interconnect udp listenner port + */ + udp_listenner = (Gp_listener_port >> 16) & 0x0ffff; + snprintf(port_str, sizeof(port_str), "%d", udp_listenner); + + MemSet(&hint, 0, sizeof(hint)); + hint.ai_socktype = SOCK_DGRAM; + hint.ai_family = AF_UNSPEC; /* Allow for IPv4 or IPv6 */ + +#ifdef AI_NUMERICSERV + hint.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; /* Never do name resolution */ +#else + hint.ai_flags = AI_NUMERICHOST; /* Never do name resolution */ +#endif + + ret = pg_getaddrinfo_all(NULL, port_str, &hint, &addrs); + if (ret || !addrs) + { + elog(LOG, "Send dummy packet failed, pg_getaddrinfo_all(): %s", strerror(errno)); + goto send_error; + } + + for (rp = addrs; rp != NULL; rp = rp->ai_next) + { + /* Create socket according to pg_getaddrinfo_all() */ + sockfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sockfd < 0) + { + continue; + } + + if (!pg_set_noblock(sockfd)) + { + if (sockfd >= 0) + closesocket(sockfd); + continue; + } + break; + } + + if (rp == NULL) + { + elog(LOG, "Send dummy packet failed, create socket failed: %s", strerror(errno)); + goto send_error; + } + + /* + * Send a dummy package to the interconnect listener, try 10 times + */ + int counter = 0; + while (counter < 10) + { + counter++; + ret = sendto(sockfd, dummy_pkt, strlen(dummy_pkt), 0, rp->ai_addr, rp->ai_addrlen); + if(ret < 0) + { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) + { + continue; + } + else + { + elog(LOG, "Send dummy packet failed, sendto failed: %s", strerror(errno)); + goto send_error; + } + } + break; + } + + if (counter >= 10) + { + elog(LOG, "Send dummy packet failed, sendto failed: %s", strerror(errno)); + goto send_error; + } + + pg_freeaddrinfo_all(hint.ai_family, addrs); + close(sockfd); + return; + +send_error: + + if (addrs) + { + pg_freeaddrinfo_all(hint.ai_family, addrs); + } + if (sockfd != -1) + { + close(sockfd); + } + return; +} + +/* +* WaitInterconnectQuit +* +* Wait for the ic thread to quit, don't clean any resource owned by ic thread +*/ +void +WaitInterconnectQuit(void) +{ + WaitInterconnectQuitUDP(); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/cdb/motion/ic_tcp.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/motion/ic_tcp.c b/src/backend/cdb/motion/ic_tcp.c index 5d3b16b..28592c7 100644 --- a/src/backend/cdb/motion/ic_tcp.c +++ b/src/backend/cdb/motion/ic_tcp.c @@ -312,8 +312,12 @@ setupTCPListeningSocket(int backlog, int *listenerSocketFd, uint16 *listenerPort break; /* Success */ close(fd); + fd = -1; } + fun = "bind"; + if (fd == -1) + goto error; /* Make socket non-blocking. */ fun = "fcntl(O_NONBLOCK)"; @@ -1469,6 +1473,10 @@ SetupTCPInterconnect(EState *estate) expectedTotalIncoming += activeNumProcs; } + + if (expectedTotalIncoming > listenerBacklog) + ereport(WARNING, (errmsg("SetupTCPInterconnect: too many expected incoming connections(%d), Interconnect setup might possibly fail", expectedTotalIncoming), + errhint("Try enlarging the gp_interconnect_tcp_listener_backlog GUC value and OS net.core.somaxconn parameter"))); if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) ereport(DEBUG1, (errmsg("SetupInterconnect will activate " http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/cdb/motion/ic_udp.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/motion/ic_udp.c b/src/backend/cdb/motion/ic_udp.c index bcc959b..0c31224 100644 --- a/src/backend/cdb/motion/ic_udp.c +++ b/src/backend/cdb/motion/ic_udp.c @@ -48,6 +48,7 @@ #include "utils/atomic.h" #include "utils/builtins.h" #include "utils/debugbreak.h" +#include "utils/faultinjector.h" #include "utils/pg_crc.h" #include "port/pg_crc32c.h" @@ -1815,6 +1816,9 @@ destroyConnHashTable(ConnHashTable *ht) pfree(ht->table); else free(ht->table); + + ht->table = NULL; + ht->size = 0; } /* @@ -4587,7 +4591,7 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntr break; } - if (pkt->seq <= ackConn->receivedAckSeq) + if (pkt->seq < ackConn->receivedAckSeq) { if (DEBUG1 >= log_min_messages) write_log("ack with bad seq?! expected (%d, %d] got %d flags 0x%x, capacity %d consumedSeq %d", ackConn->receivedAckSeq, ackConn->sentSeq, pkt->seq, pkt->flags, ackConn->capacity, ackConn->consumedSeq); @@ -4606,6 +4610,13 @@ handleAcks(ChunkTransportState *transportStates, ChunkTransportStateEntry *pEntr /* continue to deal with acks */ } + if (pkt->seq == ackConn->receivedAckSeq) + { + if (DEBUG1 >= log_min_messages) + write_log("ack with bad seq?! expected (%d, %d] got %d flags 0x%x, capacity %d consumedSeq %d", ackConn->receivedAckSeq, ackConn->sentSeq, pkt->seq, pkt->flags, ackConn->capacity, ackConn->consumedSeq); + break; + } + /* deal with a regular ack. */ if (pkt->flags & UDPIC_FLAGS_ACK) { @@ -5394,7 +5405,7 @@ checkExpirationCapacityFC(ChunkTransportState *transportStates, ChunkTransportSt uint64 now = getCurrentTime(); uint64 elapsed = now - ic_control_info.lastPacketSendTime; - if (elapsed >= (timeout * 1000)) + if (elapsed >= ((uint64)timeout * 1000)) { ICBufferLink *bufLink = icBufferListFirst(&conn->unackQueue); ICBuffer *buf = GET_ICBUFFER_FROM_PRIMARY(bufLink); @@ -5777,6 +5788,18 @@ doSendStopMessageUDP(ChunkTransportState *transportStates, int16 motNodeID) * We will skip sending ACKs to those connections. */ +#ifdef FAULT_INJECTOR + if (FaultInjector_InjectFaultIfSet( + InterconnectStopAckIsLost, + DDLNotSpecified, + "" /* databaseName */, + "" /* tableName */) == FaultInjectorTypeSkip) + { + pthread_mutex_unlock(&ic_control_info.lock); + continue; + } +#endif + if (conn->peer.ss_family == AF_INET || conn->peer.ss_family == AF_INET6) { uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0; @@ -5977,7 +6000,9 @@ handleDataPacket(MotionConn *conn, icpkthdr *pkt, struct sockaddr_storage *peer, #ifdef AMS_VERBOSE_LOGGING logPkt("STATUS QUERY MESSAGE", pkt); #endif - setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, conn->conn_info.seq - 1, conn->conn_info.extraSeq); + uint32 seq = conn->conn_info.seq > 0 ? conn->conn_info.seq - 1 : 0; + uint32 extraSeq = conn->stopRequested ? seq : conn->conn_info.extraSeq; + setAckSendParam(param, conn, UDPIC_FLAGS_CAPACITY | UDPIC_FLAGS_ACK | conn->conn_info.flags, seq, extraSeq); return false; } @@ -6224,7 +6249,9 @@ rxThreadFunc(void *arg) if (compare_and_swap_32(&ic_control_info.shutdown, 1, 0)) { if (DEBUG1 >= log_min_messages) + { write_log("udp-ic: rx-thread shutting down"); + } break; } @@ -6250,6 +6277,15 @@ rxThreadFunc(void *arg) n = poll(&nfd, 1, RX_THREAD_POLL_TIMEOUT); + if (compare_and_swap_32(&ic_control_info.shutdown, 1, 0)) + { + if (DEBUG1 >= log_min_messages) + { + write_log("udp-ic: rx-thread shutting down"); + } + break; + } + if (n < 0) { if (errno == EINTR) @@ -6285,6 +6321,15 @@ rxThreadFunc(void *arg) read_count = recvfrom(UDP_listenerFd, (char *)pkt, Gp_max_packet_size, 0, (struct sockaddr *)&peer, &peerlen); + if (compare_and_swap_32(&ic_control_info.shutdown, 1, 0)) + { + if (DEBUG1 >= log_min_messages) + { + write_log("udp-ic: rx-thread shutting down"); + } + break; + } + if (DEBUG5 >= log_min_messages) write_log("received inbound len %d", read_count); @@ -6877,3 +6922,27 @@ dumpConnections(ChunkTransportStateEntry *pEntry, const char *fname) } fclose(ofile); } + +void +WaitInterconnectQuitUDP(void) +{ + if (Gp_role == GP_ROLE_UTILITY) + { + return; + } + + /* + * Just in case ic thread is waiting on the locks. + */ + pthread_mutex_unlock(&ic_control_info.errorLock); + pthread_mutex_unlock(&ic_control_info.lock); + + compare_and_swap_32(&ic_control_info.shutdown, 0, 1); + + if (ic_control_info.threadCreated) + { + SendDummyPacket(); + pthread_join(ic_control_info.threadHandle, NULL); + } + ic_control_info.threadCreated = false; +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/parser/analyze.c ---------------------------------------------------------------------- diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 6ae62b3..5024389 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -85,9 +85,8 @@ #include "parser/parse_cte.h" #include "parser/parsetree.h" #include "rewrite/rewriteManip.h" -#include "utils/acl.h" #include "utils/builtins.h" -#include "utils/fmgroids.h" +#include "utils/datum.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -97,8 +96,6 @@ #include "cdb/cdbhash.h" #include "cdb/cdbsreh.h" -#include "executor/spi.h" - /* temporary rule to control whether we generate RULEs or not -- for testing */ bool enable_partition_rules = false; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/storage/ipc/ipc.c ---------------------------------------------------------------------- diff --git a/src/backend/storage/ipc/ipc.c b/src/backend/storage/ipc/ipc.c index 4934f80..dd7aa85 100644 --- a/src/backend/storage/ipc/ipc.c +++ b/src/backend/storage/ipc/ipc.c @@ -23,12 +23,13 @@ #include <unistd.h> #include <sys/stat.h> +#include "cdb/cdbdisp.h" #include "miscadmin.h" #ifdef PROFILE_PID_DIR #include "postmaster/autovacuum.h" #endif #include "storage/ipc.h" - +#include "libpq/pqsignal.h" /* * This flag is set during proc_exit() to change ereport()'s behavior, @@ -42,8 +43,7 @@ bool proc_exit_inprogress = false; * (or in the parent postmaster). */ static bool atexit_callback_setup = false; - - +extern void WaitInterconnectQuit(void); /* ---------------------------------------------------------------- * exit() handling stuff @@ -143,6 +143,16 @@ void proc_exit_prepare(int code) { /* + * If we came here from any critical section, we don't have safe way to + * clean up shared memory or transaction state. Though it's not a pleasant + * solution, this is better than messing up database. This is the least + * desirable bail-out, and whenever you should see this situation, you + * should consider to resolve the actual programming error. + */ + if (CritSectionCount > 0) + elog(PANIC, "process is dying from critical section"); + + /* * Once we set this flag, we are committed to exit. Any ereport() will * NOT send control back to the main loop, but right back here. */ @@ -162,6 +172,30 @@ proc_exit_prepare(int code) InterruptHoldoffCount = 1; CritSectionCount = 0; + /* + * Also clear the error context stack, to prevent error callbacks + * from being invoked by any elog/ereport calls made during proc_exit. + * Whatever context they might want to offer is probably not relevant, + * and in any case they are likely to fail outright after we've done + * things like aborting any open transaction. (In normal exit scenarios + * the context stack should be empty anyway, but it might not be in the + * case of elog(FATAL) for example.) + */ + error_context_stack = NULL; + + /* + * Make sure interconnect thread quit before shmem_exit() in FATAL case. + * Otherwise, shmem_exit() may free MemoryContex of MotionConns in connHtab unexpectedly; + * + * For example: PORTAL_MULTI_QUERY strategy doesn't bind estate with portal, + * so when fatal occurs, MotionConns of estate don't get removed through + * TeardownInterconnect(), but MemoryContex of these MotionConns are freed. + * + * It's ok to shutdown Interconnect background thread here, process is dying, no + * necessary to receive more motion data. + */ + WaitInterconnectQuit(); + /* do our shared memory exits first */ shmem_exit(code); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/backend/utils/misc/faultinjector.c ---------------------------------------------------------------------- diff --git a/src/backend/utils/misc/faultinjector.c b/src/backend/utils/misc/faultinjector.c index 5c47c35..192b884 100644 --- a/src/backend/utils/misc/faultinjector.c +++ b/src/backend/utils/misc/faultinjector.c @@ -324,7 +324,9 @@ FaultInjectorIdentifierEnumToString[] = { _("opt_task_allocate_string_buffer"), /* inject fault while allocating string buffer */ _("runaway_cleanup"), - /* inject fault before cleaning up a runaway query */ + /* inject fault before cleaning up a runaway query */ + _("interconnect_stop_ack_is_lost"), + /* inject fault in interconnect to skip sending the stop ack */ _("not recognized"), }; @@ -985,6 +987,7 @@ FaultInjector_NewHashEntry( case FinishPreparedTransactionAbortPass1AbortingCreateNeeded: case FinishPreparedTransactionAbortPass2AbortingCreateNeeded: + case InterconnectStopAckIsLost: case SyncPersistentTable: case XLOGInsert: http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/include/cdb/ml_ipc.h ---------------------------------------------------------------------- diff --git a/src/include/cdb/ml_ipc.h b/src/include/cdb/ml_ipc.h index 2b37c4f..2276b42 100644 --- a/src/include/cdb/ml_ipc.h +++ b/src/include/cdb/ml_ipc.h @@ -84,6 +84,16 @@ extern void InitMotionLayerIPC(void); */ extern void CleanUpMotionLayerIPC(void); +/* + * Wait interconnect thread to quit, called when proc exit. + */ +extern void WaitInterconnectQuit(void); + +/* +* Send a dummy packet to interconnect thread to exit poll() immediately +*/ +extern void SendDummyPacket(void); + /* Returns the fd of the socket that connects to the seqserver. This value * is -1 if it has not been setup. */ @@ -335,6 +345,7 @@ extern void markUDPConnInactive(MotionConn *conn); extern void CleanupMotionTCP(void); extern void CleanupMotionUDP(void); +extern void WaitInterconnectQuitUDP(void); extern void adjustMasterRouting(Slice *recvSlice); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/include/utils/faultinjector.h ---------------------------------------------------------------------- diff --git a/src/include/utils/faultinjector.h b/src/include/utils/faultinjector.h index 06a3dfc..b7cf10f 100644 --- a/src/include/utils/faultinjector.h +++ b/src/include/utils/faultinjector.h @@ -215,6 +215,8 @@ typedef enum FaultInjectorIdentifier_e { RunawayCleanup, + InterconnectStopAckIsLost, + /* INSERT has to be done before that line */ FaultInjectorIdMax, http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/test/regress/expected/icudp_full.out ---------------------------------------------------------------------- diff --git a/src/test/regress/expected/icudp_full.out b/src/test/regress/expected/icudp_full.out index c1b8d91..f2d58a3 100644 --- a/src/test/regress/expected/icudp_full.out +++ b/src/test/regress/expected/icudp_full.out @@ -749,3 +749,20 @@ RESET gp_log_interconnect; RESET log_min_messages; RESET search_path; DROP SCHEMA ic_udp_test CASCADE; +/* + * If ack packet is lost in doSendStopMessageUDP(), transaction with cursor + * should still be able to commit. +*/ +--start_ignore +drop table if exists ic_test_1; +NOTICE: table "ic_test_1" does not exist, skipping +--end_ignore +create table ic_test_1 as select i as c1, i as c2 from generate_series(1, 100000) i; +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column(s) named 'c1' as the Greenplum Database data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. +begin; +declare ic_test_cursor_c1 cursor for select * from ic_test_1; +\! hawqfaultinjector -q -f interconnect_stop_ack_is_lost -y reset -s 1 +\! hawqfaultinjector -q -f interconnect_stop_ack_is_lost -y skip -s 1 +commit; +drop table ic_test_1; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/src/test/regress/sql/icudp_full.sql ---------------------------------------------------------------------- diff --git a/src/test/regress/sql/icudp_full.sql b/src/test/regress/sql/icudp_full.sql index 116f89a..0a02b27 100644 --- a/src/test/regress/sql/icudp_full.sql +++ b/src/test/regress/sql/icudp_full.sql @@ -372,3 +372,19 @@ RESET log_min_messages; RESET search_path; DROP SCHEMA ic_udp_test CASCADE; + + +/* + * If ack packet is lost in doSendStopMessageUDP(), transaction with cursor + * should still be able to commit. +*/ +--start_ignore +drop table if exists ic_test_1; +--end_ignore +create table ic_test_1 as select i as c1, i as c2 from generate_series(1, 100000) i; +begin; +declare ic_test_cursor_c1 cursor for select * from ic_test_1; +\! hawqfaultinjector -q -f interconnect_stop_ack_is_lost -y reset -s 1 +\! hawqfaultinjector -q -f interconnect_stop_ack_is_lost -y skip -s 1 +commit; +drop table ic_test_1; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/2a077062/tools/bin/hawqpylib/programs/clsInjectFault.py ---------------------------------------------------------------------- diff --git a/tools/bin/hawqpylib/programs/clsInjectFault.py b/tools/bin/hawqpylib/programs/clsInjectFault.py index 7281338..0ec1dcd 100644 --- a/tools/bin/hawqpylib/programs/clsInjectFault.py +++ b/tools/bin/hawqpylib/programs/clsInjectFault.py @@ -420,6 +420,7 @@ class HAWQInjectFaultProgram: "fail_qe_when_do_query (inject fault when QE actually working, set error)" \ "fail_qe_when_begin_parquet_scan (inject fault when begin scan parquet table, set error)"\ "fail_qe_when_parquet_get_next (inject fault when get next, set error)"\ + "interconnect_stop_ack_is_lost (inject fault in interconnect to skip sending the stop ack), " \ "all (affects all faults injected, used for 'status' and 'reset'), ") addTo.add_option("-c", "--ddl_statement", dest="ddlStatement", type="string", metavar="ddlStatement",
