Repository: incubator-hawq Updated Branches: refs/heads/master 17a9dc1e5 -> 55e5ab5a3
HAWQ-1326. Cancel the query earlier if one of the segments for the query crashes Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/55e5ab5a Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/55e5ab5a Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/55e5ab5a Branch: refs/heads/master Commit: 55e5ab5a3eadfa9ba022db86826980cda870f1ce Parents: 17a9dc1 Author: Paul Guo <[email protected]> Authored: Mon Feb 6 19:54:57 2017 +0800 Committer: Paul Guo <[email protected]> Committed: Tue Feb 14 17:12:27 2017 +0800 ---------------------------------------------------------------------- src/backend/cdb/dispatcher_mgt.c | 32 +++++---- src/backend/cdb/executormgr.c | 32 ++++++++- src/backend/resourcemanager/include/dynrm.h | 8 +++ src/backend/resourcemanager/resourcemanager.c | 77 ++++++++++++++++++++++ src/backend/resourcemanager/resourcepool.c | 30 +++++++++ src/backend/storage/ipc/ipci.c | 6 +- src/backend/tcop/pquery.c | 13 ++-- src/include/cdb/cdbutil.h | 3 + src/include/cdb/executormgr.h | 2 + 9 files changed, 181 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/cdb/dispatcher_mgt.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/dispatcher_mgt.c b/src/backend/cdb/dispatcher_mgt.c index 8ed3402..31c483e 100644 --- a/src/backend/cdb/dispatcher_mgt.c +++ b/src/backend/cdb/dispatcher_mgt.c @@ -42,7 +42,6 @@ #endif #include "cdb/cdbconn.h" /* SOCK_ERRNO */ - typedef enum DispMgtConstant { DISPMGT_POLL_TIME = 2 * 1000, } DispMgtConstant; @@ -288,14 +287,16 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState *state) while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator)) != NULL) { if (workermgr_should_query_stop(state)) { - write_log("+++++++++dispmgr_thread_func_run meets should query " - "stop before dispatching, entering error_cleanup"); + write_log("+++++++++%s meets should query " + "stop before dispatching, entering error_cleanup", + __func__); goto error_cleanup; } if (!executormgr_dispatch_and_run(data, executor)) { - write_log("+++++++++dispmgr_thread_func_run meets dispatch_and_run " - "problem when dispatching, entering error_cleanup"); + write_log("+++++++++%s meets dispatch_and_run " + "problem when dispatching, entering error_cleanup", + __func__); goto error_cleanup; } } @@ -309,14 +310,23 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState *state) /* Check global state to abort query, this let poll process easier. */ if (workermgr_should_query_stop(state)){ - write_log("dispmgr_thread_func_run meets should query stop when " - "polling executors, entering error_cleanup"); + write_log("%s meets should query stop when " + "polling executors, entering error_cleanup", + __func__); goto error_cleanup; } /* Skip the stopped executor make the logic easy to understand. */ dispmgt_init_query_executor_in_group_iterator(group, &iterator, true); while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator)) != NULL) { + if (!executormgr_check_segment_status(executor)) + { + write_log("Detected one segment (Global ID: %d) is down, so " + "abort the query that is running or will run on it", + executormgr_get_ID(executor)); + goto error_cleanup; + } + /* * The fds array may shorter than executor array. * DO NOT mark executor stop! @@ -367,7 +377,7 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState *state) dispmgt_init_query_executor_in_group_iterator(group, &iterator, true); while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator)) != NULL) { - int sockfd; + int __MAYBE_UNUSED sockfd; sockfd = executormgr_get_fd(executor); /* TODO: is that safe to call Assert() in a thread ? */ @@ -376,7 +386,7 @@ dispmgt_thread_func_run(QueryExecutorGroup *group, struct WorkerMgrState *state) continue; if (!executormgr_consume(executor)) { - write_log("dispmgr_thread_func_run meets consume error for executor, entering error_cleanup"); + write_log("%s meets consume error for executor, entering error_cleanup", __func__); goto error_cleanup; } } @@ -388,7 +398,7 @@ error_cleanup: * 1. query cancel, result error, and poll error: mark the executor stop. * 2. connection error: mark the gang error. Set by workermgr_mark_executor_error(). */ - workermgr_set_state_cancel(state); + workermgr_set_state_cancel(state); dispmgt_init_query_executor_in_group_iterator(group, &iterator, false); while ((executor = dispmgt_get_query_executor_in_group_iterator(group, &iterator)) != NULL) { @@ -514,8 +524,6 @@ dispmgt_concurrent_connect(List *executors, int executors_num_per_thread) } PG_CATCH(); { - ListCell *lc; - workermgr_cancel_job(state); /* We have to clean up the executors. */ dispmgt_free_concurrent_connect_state(tasks); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/cdb/executormgr.c ---------------------------------------------------------------------- diff --git a/src/backend/cdb/executormgr.c b/src/backend/cdb/executormgr.c index f8efb3d..85d3381 100644 --- a/src/backend/cdb/executormgr.c +++ b/src/backend/cdb/executormgr.c @@ -42,6 +42,8 @@ #include "utils/guc_tables.h" /* TODO: manipulate gucs */ #include "portability/instr_time.h" /* Monitor the dispatcher performance */ +#include "resourcemanager/dynrm.h" + typedef enum ExecutorMgrConstant { EXECUTORMGR_CANCEL_ERROR_BUFFER_SIZE = 256, } ExecutorMgrConstant; @@ -320,6 +322,12 @@ executormgr_get_executor_result(QueryExecutor *executor) } int +executormgr_get_ID(QueryExecutor *executor) +{ + return executor->desc->segment->ID; +} + +int executormgr_get_fd(QueryExecutor *executor) { return PQsocket(executor->desc->conn); @@ -399,7 +407,7 @@ executormgr_is_dispatchable(QueryExecutor *executor) if (!executormgr_validate_conn(conn) || PQstatus(conn) == CONNECTION_BAD) { - write_log("function executormgr_is_dispatchable meets error, connection is bad."); + write_log("function %s meets error, connection is bad.", __func__); executormgr_catch_error(executor); return false; } @@ -471,6 +479,26 @@ error: return false; } +bool +executormgr_check_segment_status(QueryExecutor *executor) +{ + /* + * Cancel the query if a segment is down. QEs could hang in interconnect + * until timeout when one segment is down. This will cause QD keep polling + * until QE timeout. + */ + int ID = executormgr_get_ID(executor); + + if (IsSegmentDown(ID)) + { + cdbdisp_seterrcode(ERRCODE_GP_INTERCONNECTION_ERROR, -1, + executormgr_get_executor_result(executor)); + return false; + } + + return true; +} + /* * executormgr_consume * If there are data available for executor, use this interface to consume data. @@ -547,7 +575,7 @@ executormgr_consume(QueryExecutor *executor) connection_error: /* Let caller deal with connection error. */ - write_log("function executormgr_consume meets error, connection is bad."); + write_log("function %s meets error, connection is bad.", __func__); executormgr_catch_error(executor); return false; } http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/resourcemanager/include/dynrm.h ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/include/dynrm.h b/src/backend/resourcemanager/include/dynrm.h index a6309c8..60b6c6a 100644 --- a/src/backend/resourcemanager/include/dynrm.h +++ b/src/backend/resourcemanager/include/dynrm.h @@ -324,4 +324,12 @@ int MainHandlerLoop_RMSEG(void); void checkAndBuildFailedTmpDirList(void); void switchIMAliveTarget(void); + +int SegmentStatus_ShmSize(void); +void SegmentStatusShmemReset(void); +void SegmentStatusShmemInit(void); +void MarkSegmentDown(int x); +void MarkSegmentUp(int x); +bool IsSegmentDown(int x); + #endif //DYNAMIC_RESOURCE_MANAGEMENT_H http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/resourcemanager/resourcemanager.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/resourcemanager.c b/src/backend/resourcemanager/resourcemanager.c index dda4271..652c83f 100644 --- a/src/backend/resourcemanager/resourcemanager.c +++ b/src/backend/resourcemanager/resourcemanager.c @@ -20,6 +20,7 @@ #include "envswitch.h" #include "dynrm.h" #include "utils/hashtable.h" +#include "cdb/cdbfts.h" #include "resourcemanager/resourcemanager.h" #include "resourceenforcer/resourceenforcer.h" @@ -67,6 +68,11 @@ extern bool FindMyDatabase(const char *name, Oid *db_id, Oid *db_tablespace); static char *probeDatabase = "template1"; +/* Bitmap to monitor segment health info (up/down). */ +bits8 *shm_segment_status; +void MarkSegmentUp(int x); +void MarkSegmentDown(int x); + int loadAllQueueAndUser(void); int loadHostInformationIntoResourcePool(void); @@ -3014,3 +3020,74 @@ bool cleanedAllGRMContainers(void) return PRESPOOL->AddPendingContainerCount == 0 && PRESPOOL->RetPendingContainerCount == 0; } + +int SegmentStatus_ShmSize(void) +{ + return (FTS_MAX_DBS >> 3) + 1; +} + +void SegmentStatusShmemReset(void) +{ + /* For each bit, 0 means an invalid node (down or non-existent). */ + MemSet(shm_segment_status, 0, SegmentStatus_ShmSize()); +} + +void SegmentStatusShmemInit(void) +{ + bool found; + + shm_segment_status = (bits8 *)ShmemInitStruct( + "Segment status (up/down. Monitored by RM)", SegmentStatus_ShmSize(), &found); + + if (!shm_segment_status) + elog(FATAL, "could not initialize segment status bitmap (shared memory)."); + + if (found) + return; + + SegmentStatusShmemReset(); +} + +void MarkSegmentDown(int x) +{ + int octnum, bitnum; + + if (x < 0 || x >= FTS_MAX_DBS) + elog(ERROR, "Segment ID is out of range (%d)", x); + + octnum = x >> 3; + bitnum = x & 0x7; + + /* Clear the bit. */ + shm_segment_status[octnum] &= ~(1 << bitnum); +} + +void MarkSegmentUp(int x) +{ + int octnum, bitnum; + + if (x < 0 || x >= FTS_MAX_DBS) + elog(ERROR, "Segment ID is out of range (%d)", x); + + octnum = x >> 3; + bitnum = x & 0x7; + + /* Set the bit. */ + shm_segment_status[octnum] |= (1 << bitnum); +} + +bool IsSegmentDown(int x) +{ + int octnum, bitnum; + + if (x < 0 || x >= FTS_MAX_DBS) + elog(ERROR, "Segment ID is out of range (%d)", x); + + octnum = x >> 3; + bitnum = x & 0x7; + + if ((shm_segment_status[octnum] & (1 << bitnum)) == 0) + return true; + + return false; +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/resourcemanager/resourcepool.c ---------------------------------------------------------------------- diff --git a/src/backend/resourcemanager/resourcepool.c b/src/backend/resourcemanager/resourcepool.c index 63357b4..9485dbf 100644 --- a/src/backend/resourcemanager/resourcepool.c +++ b/src/backend/resourcemanager/resourcepool.c @@ -78,6 +78,8 @@ int __DRM_NODERESPOOL_comp_ratioFree(void *arg, void *val1, void *val2); int __DRM_NODERESPOOL_comp_ratioAlloc(void *arg, void *val1, void *val2); int __DRM_NODERESPOOL_comp_combine(void *arg, void *val1, void *val2); +extern void MarkSegmentUp(int x); + /* * The balanced BST index comparing function. The segment containing most * available resource is ordered at the left most, the segment not in @@ -411,6 +413,8 @@ void cleanup_segment_config() PQExpBuffer sql = NULL; PGresult* result = NULL; + SegmentStatusShmemReset(); + sprintf(conninfo, "options='-c gp_session_role=UTILITY -c allow_system_table_mods=dml' " "dbname=template1 port=%d connect_timeout=%d", master_addr_port, CONNECT_TIMEOUT); conn = PQconnectdb(conninfo); @@ -647,6 +651,19 @@ void update_segment_status(int32_t id, char status, char* description) else if (status == SEGMENT_STATUS_DOWN) Assert(strlen(description) != 0); + /* For segment nodes only. */ + if (id >= REGISTRATION_ORDER_OFFSET) + { + if (status == SEGMENT_STATUS_UP) + MarkSegmentUp(id - REGISTRATION_ORDER_OFFSET); + else if (status == SEGMENT_STATUS_DOWN) + MarkSegmentDown(id - REGISTRATION_ORDER_OFFSET); + else + elog(ERROR, "Unrecognized segment status character: '%c' " + "(Should be one of '%c' and '%c')", status, + SEGMENT_STATUS_UP, SEGMENT_STATUS_DOWN); + } + sprintf(conninfo, "options='-c gp_session_role=UTILITY -c allow_system_table_mods=dml' " "dbname=template1 port=%d connect_timeout=%d", master_addr_port, CONNECT_TIMEOUT); conn = PQconnectdb(conninfo); @@ -814,6 +831,19 @@ void add_segment_config_row(int32_t id, PQExpBuffer sql = NULL; PGresult* result = NULL; + /* For segment nodes only. */ + if (id >= REGISTRATION_ORDER_OFFSET) + { + if (status == SEGMENT_STATUS_UP) + MarkSegmentUp(id - REGISTRATION_ORDER_OFFSET); + else if (status == SEGMENT_STATUS_DOWN) + MarkSegmentDown(id - REGISTRATION_ORDER_OFFSET); + else + elog(ERROR, "Unrecognized segment status character: '%c' " + "(Should be one of '%c' and '%c')", status, + SEGMENT_STATUS_UP, SEGMENT_STATUS_DOWN); + } + sprintf(conninfo, "options='-c gp_session_role=UTILITY -c allow_system_table_mods=dml' " "dbname=template1 port=%d connect_timeout=%d", master_addr_port, CONNECT_TIMEOUT); conn = PQconnectdb(conninfo); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/storage/ipc/ipci.c ---------------------------------------------------------------------- diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index ad73af1..f65f0ff 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -66,6 +66,8 @@ #include "cdb/cdbtmpdir.h" #include "utils/session_state.h" +#include "resourcemanager/dynrm.h" + shmem_startup_hook_type shmem_startup_hook = NULL; static Size total_addin_request = 0; @@ -168,6 +170,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, PersistentRelfile_ShmemSize()); size = add_size(size, Pass2Recovery_ShmemSize()); size = add_size(size, FSCredShmemSize()); + size = add_size(size, SegmentStatus_ShmSize()); if (Gp_role == GP_ROLE_DISPATCH || Gp_role == GP_ROLE_UTILITY) { @@ -363,6 +366,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) FSCredShmemInit(); + SegmentStatusShmemInit(); #ifdef EXEC_BACKEND /* @@ -371,7 +375,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) if (!IsUnderPostmaster) ShmemBackendArrayAllocation(); #endif - + SPI_InitMemoryReservation(); /* http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/backend/tcop/pquery.c ---------------------------------------------------------------------- diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index efa5bce..0a5b9d5 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -832,7 +832,8 @@ AllocateResource(QueryResourceLife life, seg->hdfsHostname = qdseginfo->QD_HdfsHostName == NULL ? NULL : pstrdup(qdseginfo->QD_HdfsHostName); - seg->segindex = i; + seg->segindex = i; + seg->ID = qdseginfo->QD_SegInfo->ID; seg->master = qdseginfo->QD_SegInfo->master; seg->port = qdseginfo->QD_SegInfo->port; seg->standby = qdseginfo->QD_SegInfo->standby; @@ -849,8 +850,8 @@ AllocateResource(QueryResourceLife life, elog(DEBUG3, "Get allocated segment located at : %s:%d," "Address:%s," - "Master:%d,Standby:%d,Alive:%d,ID:%d, " - "HDFS Host:%s", + "Master:%d,Standby:%d,Alive:%d,internl index:%d, " + "Global ID:%d, HDFS Host:%s", seg->hostname, seg->port, seg->hostip, @@ -858,6 +859,7 @@ AllocateResource(QueryResourceLife life, seg->standby, seg->alive, seg->segindex, + seg->ID, (seg->hdfsHostname == NULL ? "NULL" : seg->hdfsHostname)); } } @@ -959,7 +961,6 @@ static void RemoveFromGlobalQueryResources(int resourceId, QueryResourceLife life) { ListCell *lc; - QueryResourceItem *newItem; MemoryContext old; if (life == QRL_NONE) @@ -991,8 +992,6 @@ static void SetResourcesAllocatedSucceed(int resourceId, QueryResourceLife life) { ListCell *lc; - QueryResourceItem *newItem; - MemoryContext old; if (life == QRL_NONE) { @@ -1019,7 +1018,7 @@ FreeResource(QueryResource *resource) ListCell *lc; int ret; char errorbuf[1024]; - bool found = false; + bool __MAYBE_UNUSED found = false; if (!resource) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/include/cdb/cdbutil.h ---------------------------------------------------------------------- diff --git a/src/include/cdb/cdbutil.h b/src/include/cdb/cdbutil.h index 23b4acc..7962067 100644 --- a/src/include/cdb/cdbutil.h +++ b/src/include/cdb/cdbutil.h @@ -127,6 +127,9 @@ typedef struct Segment { bool alive; int segindex; + + /* Global unique ID. */ + int ID; } Segment; extern Segment *GetMasterSegment(void); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/55e5ab5a/src/include/cdb/executormgr.h ---------------------------------------------------------------------- diff --git a/src/include/cdb/executormgr.h b/src/include/cdb/executormgr.h index d1c562e..7d9f3c0 100644 --- a/src/include/cdb/executormgr.h +++ b/src/include/cdb/executormgr.h @@ -70,9 +70,11 @@ extern void executormgr_get_executor_connection_info(struct QueryExecutor *execu extern bool executormgr_is_stop(struct QueryExecutor *executor); extern bool executormgr_has_error(struct QueryExecutor *executor); extern int executormgr_get_executor_slice_id(struct QueryExecutor *executor); +extern int executormgr_get_ID(struct QueryExecutor *executor); extern int executormgr_get_fd(struct QueryExecutor *executor); extern bool executormgr_cancel(struct QueryExecutor * executor); extern bool executormgr_dispatch_and_run(struct DispatchData *data, struct QueryExecutor *executor); +extern bool executormgr_check_segment_status(struct QueryExecutor *executor); extern bool executormgr_consume(struct QueryExecutor *executor); extern bool executormgr_discard(struct QueryExecutor *executor); extern void executormgr_merge_error(struct QueryExecutor *exeutor);
