Repository: incubator-hawq Updated Branches: refs/heads/master 820d97404 -> 270575d8a
HAWQ-1458. Fix share input scan bug for writer part. Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/270575d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/270575d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/270575d8 Branch: refs/heads/master Commit: 270575d8aca18b6073212c2fed44aedc3c417615 Parents: 820d974 Author: amyrazz44 <[email protected]> Authored: Mon May 8 17:27:07 2017 +0800 Committer: rlei <[email protected]> Committed: Wed Aug 23 17:57:50 2017 +0800 ---------------------------------------------------------------------- src/backend/executor/nodeMaterial.c | 35 +++++- src/backend/executor/nodeShareInputScan.c | 161 ++++++++++++++++++++++++- src/backend/utils/misc/guc.c | 12 +- src/include/executor/nodeMaterial.h | 2 + src/include/executor/nodeShareInputScan.h | 3 + src/include/utils/guc.h | 2 + 6 files changed, 205 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/270575d8/src/backend/executor/nodeMaterial.c ---------------------------------------------------------------------- diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c index f2b82b2..551db21 100644 --- a/src/backend/executor/nodeMaterial.c +++ b/src/backend/executor/nodeMaterial.c @@ -41,19 +41,20 @@ #include "postgres.h" #include "executor/executor.h" -#include "executor/nodeMaterial.h" #include "executor/instrument.h" /* Instrumentation */ #include "utils/tuplestorenew.h" - +#include "executor/nodeMaterial.h" #include "miscadmin.h" #include "cdb/cdbvars.h" +#include "postmaster/primary_mirror_mode.h" + static void ExecMaterialExplainEnd(PlanState *planstate, struct StringInfoData *buf); static void ExecChildRescan(MaterialState *node, ExprContext *exprCtxt); static void DestroyTupleStore(MaterialState *node); static void ExecMaterialResetWorkfileState(MaterialState *node); - +static void mkLockFileForWriter(int size, int share_id, char * name); /* ---------------------------------------------------------------- * ExecMaterial @@ -115,6 +116,7 @@ ExecMaterial(MaterialState *node) ts = ntuplestore_create_readerwriter(rwfile_prefix, PlanStateOperatorMemKB((PlanState *)node) * 1024, true); tsa = ntuplestore_create_accessor(ts, true); + mkLockFileForWriter(MAXPGPATH, ma->share_id, "writer"); } else { @@ -759,3 +761,30 @@ ExecEagerFreeMaterial(MaterialState *node) } } + +/* + * mkLockFileForWriter + * + * Create a unique lock file for writer, then use flock() to lock/unlock the lock file. + * We can make sure the lock file will be locked forerver until the writer process quits. + */ +static void mkLockFileForWriter(int size, int share_id, char * name) +{ + char *lock_file; + int lock; + + lock_file = (char *)palloc0(size); + generate_lock_file_name(lock_file, size, share_id, name); + elog(DEBUG3, "The lock file for writer in SISC is %s", lock_file); + sisc_writer_lock_fd = open(lock_file, O_CREAT, S_IRWXU); + if(sisc_writer_lock_fd < 0) + { + elog(ERROR, "Could not create lock file %s for writer in SISC. The error number is %d", lock_file, errno); + } + lock = flock(sisc_writer_lock_fd, LOCK_EX | LOCK_NB); + if(lock == -1) + elog(DEBUG3, "Could not lock lock file \"%s\" for writer in SISC . The error number is %d", lock_file, errno); + else if(lock == 0) + elog(LOG, "Successfully locked lock file \"%s\" for writer in SISC.", lock_file); + pfree(lock_file); +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/270575d8/src/backend/executor/nodeShareInputScan.c ---------------------------------------------------------------------- diff --git a/src/backend/executor/nodeShareInputScan.c b/src/backend/executor/nodeShareInputScan.c index 049943b..b9937bf 100644 --- a/src/backend/executor/nodeShareInputScan.c +++ b/src/backend/executor/nodeShareInputScan.c @@ -43,7 +43,6 @@ #include "cdb/cdbvars.h" #include "executor/executor.h" #include "executor/nodeShareInputScan.h" - #include "utils/tuplestorenew.h" #include "miscadmin.h" @@ -552,6 +551,52 @@ static void sisc_lockname(char* p, int size, int share_id, const char* name) } } + +char *joint_lock_file_name(ShareInput_Lk_Context *lk_ctxt, char *name) +{ + char *lock_file = palloc0(MAXPGPATH); + + if(strncmp("writer", name, strlen("writer")) ==0 ) + { + strncat(lock_file, lk_ctxt->lkname_ready, MAXPGPATH - strlen(lock_file) - 1); + } + else + { + strncat(lock_file, lk_ctxt->lkname_done, MAXPGPATH - strlen(lock_file) - 1); + } + strncat(lock_file, name, MAXPGPATH - strlen(lock_file) -1); + return lock_file; +} + +void drop_lock_files(ShareInput_Lk_Context *lk_ctxt) +{ + char *writer_lock_file = NULL; + char *reader_lock_file = NULL; + + writer_lock_file = joint_lock_file_name(lk_ctxt, "writer"); + if(access(writer_lock_file, F_OK) == 0) + { + elog(DEBUG3, "Drop writer's lock files %s in SISC", writer_lock_file); + unlink(writer_lock_file); + } + else + { + elog(DEBUG3, "Writer's lock files %s has been dropped already in SISC", writer_lock_file); + } + pfree(writer_lock_file); + reader_lock_file = joint_lock_file_name(lk_ctxt, "reader"); + if(access(reader_lock_file, F_OK) == 0) + { + elog(DEBUG3, "Drop reader's lock files %s in SISC", reader_lock_file); + unlink(writer_lock_file); + } + else + { + elog(DEBUG3, "Reader's lock files %s has been dropped already in SISC", reader_lock_file); + } + pfree(reader_lock_file); +} + static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt) { int err; @@ -590,6 +635,10 @@ static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt) lk_ctxt->del_done = false; } + elog(DEBUG3, "Begin to drop all the lock files for SISC"); + drop_lock_files(lk_ctxt); + elog(DEBUG3, "End of drop lock files for SISC"); + gp_free2 (lk_ctxt, sizeof(ShareInput_Lk_Context)); } @@ -666,6 +715,29 @@ write_retry: return 0; } + + +/* + * generate_lock_file_name + * + * Called by reader or writer to make the unique lock file name. + */ +void generate_lock_file_name(char* p, int size, int share_id, const char* name) +{ + if (strncmp(name , "writer", strlen("writer")) == 0) + { + sisc_lockname(p, size, share_id, "ready"); + strncat(p, name, size - strlen(p) - 1); + } + else + { + sisc_lockname(p, size, share_id, "done"); + strncat(p, name, size - strlen(p) - 1); + } +} + + + /* * Readiness (a) synchronization. * @@ -709,6 +781,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen) struct timeval tval; int n; char a; + int file_exists = -1; + int timeout_interval = 0; + bool flag = false; //A tag for file exists or not. + int lock_fd = -1; + int lock = -1; + bool is_lock_firsttime = true; + char *writer_lock_file = NULL; //current path for lock file. ShareInput_Lk_Context *pctxt = gp_malloc(sizeof(ShareInput_Lk_Context)); @@ -738,6 +817,9 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen) if(pctxt->donefd < 0) elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done); + writer_lock_file = joint_lock_file_name(pctxt, "writer"); + elog(DEBUG3, "The lock file of writer in SISC is %s", writer_lock_file); + while(1) { CHECK_FOR_INTERRUPTS(); @@ -773,13 +855,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen) retry_read(pctxt->readyfd, &a, 1); Assert(rwsize == 1 && a == 'a'); - elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake", + elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake", share_id, currentSliceId); if (planGen == PLANGEN_PLANNER) { /* For planner-generated plans, we send ack back after receiving the handshake */ - elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer", + elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer", share_id, currentSliceId); #if USE_ASSERT_CHECKING @@ -793,8 +875,70 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen) } else if(n==0) { - elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once", - share_id, currentSliceId); + file_exists = access(writer_lock_file, F_OK); + if(file_exists != 0) + { + elog(DEBUG3, "Wait lock file for writer time out interval is %d", timeout_interval); + if(timeout_interval >= share_input_scan_wait_lockfile_timeout || flag == true) //If lock file never exists or disappeared, reader will no longer waiting for writer + { + elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready time out and break", + share_id, currentSliceId); + pfree(writer_lock_file); + break; + } + timeout_interval += tval.tv_sec * 1000 + tval.tv_usec; + } + else + { + elog(LOG, "writer lock file of shareinput_reader_waitready() is %s", writer_lock_file); + flag = true; + lock_fd = open(writer_lock_file, O_RDONLY); + if(lock_fd < 0) + { + elog(DEBUG3, "Open writer's lock file %s failed!, error number is %d", writer_lock_file, errno); + continue; + } + lock = flock(lock_fd, LOCK_EX | LOCK_NB); + if(lock == -1) + { + /* + * Reader try to lock the lock file which writer created until locked the lock file successfully + * which means that writer process quit. If reader lock the lock file failed, it means that writer + * process is healthy. + */ + elog(DEBUG3, "Lock writer's lock file %s failed!, error number is %d", writer_lock_file, errno); + } + else if(lock == 0) + { + /* + * There is one situation to consider about. + * Writer need a time interval to lock the lock file after the lock file has been created. + * So, if reader lock the lock file ahead of writer, we should unlock it. + * If reader lock the lock file after writer, it means that writer process has abort. + * We should break the loop to make sure reader no longer wait for writer. + */ + if(is_lock_firsttime == true) + { + lock = flock(lock_fd, LOCK_UN); + is_lock_firsttime = false; + elog(DEBUG3, "Lock writer's lock file %s first time successfully in SISC! Unlock it.", writer_lock_file); + continue; + } + else + { + elog(LOG, "Lock writer's lock file %s successfully in SISC!", writer_lock_file); + /* Retry to close the fd in case there is interruption from signal */ + while ((close(lock_fd) < 0) && (errno == EINTR)) + { + elog(DEBUG3, "Failed to close SISC temporary file due to strerror(errno)"); + } + pfree(writer_lock_file); + break; + } + } + elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once", + share_id, currentSliceId); + } } else { @@ -802,6 +946,7 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen) elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready try again, errno %d ... ", share_id, currentSliceId, save_errno); } + } return (void *) pctxt; } @@ -853,7 +998,6 @@ shareinput_writer_notifyready(int share_id, int xslice, PlanGenerator planGen) pctxt->donefd = open(pctxt->lkname_done, O_RDWR, 0600); if(pctxt->donefd < 0) elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done); - for(n=0; n<xslice; ++n) { #if USE_ASSERT_CHECKING @@ -935,6 +1079,11 @@ writer_wait_for_acks(ShareInput_Lk_Context *pctxt, int share_id, int xslice) elog(DEBUG1, "SISC WRITER (shareid=%d, slice=%d): notify ready succeed 1, xslice remaining %d", share_id, currentSliceId, ack_needed); } + if(ack_needed == 0 && sisc_writer_lock_fd > 0) + { + close(sisc_writer_lock_fd); + } + } else if(numReady==0) { http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/270575d8/src/backend/utils/misc/guc.c ---------------------------------------------------------------------- diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 6769d3b..ac29d87 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -668,6 +668,8 @@ bool gp_cte_sharing = false; char *gp_idf_deduplicate_str; +int share_input_scan_wait_lockfile_timeout; + /* gp_disable_catalog_access_on_segment */ bool gp_disable_catalog_access_on_segment = false; @@ -6685,6 +6687,15 @@ static struct config_int ConfigureNamesInt[] = &metadata_cache_max_hdfs_file_num, 524288, 32768, 8388608, NULL, NULL }, + { + {"share_input_scan_wait_lockfile_timeout", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("timeout (in millisecond) for waiting lock file which writer creates."), + NULL + }, + &share_input_scan_wait_lockfile_timeout, + 300000, 1, INT_MAX, NULL, NULL + }, + /* End-of-list marker */ { @@ -8349,7 +8360,6 @@ static struct config_string ConfigureNamesString[] = NULL, NULL, NULL }, - /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/270575d8/src/include/executor/nodeMaterial.h ---------------------------------------------------------------------- diff --git a/src/include/executor/nodeMaterial.h b/src/include/executor/nodeMaterial.h index 509f78c..187824b 100644 --- a/src/include/executor/nodeMaterial.h +++ b/src/include/executor/nodeMaterial.h @@ -15,7 +15,9 @@ #define NODEMATERIAL_H #include "nodes/execnodes.h" +#include "executor/nodeShareInputScan.h" +static int sisc_writer_lock_fd = -1; extern int ExecCountSlotsMaterial(Material *node); extern MaterialState *ExecInitMaterial(Material *node, EState *estate, int eflags); extern TupleTableSlot *ExecMaterial(MaterialState *node); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/270575d8/src/include/executor/nodeShareInputScan.h ---------------------------------------------------------------------- diff --git a/src/include/executor/nodeShareInputScan.h b/src/include/executor/nodeShareInputScan.h index 23025b9..aeb88a7 100644 --- a/src/include/executor/nodeShareInputScan.h +++ b/src/include/executor/nodeShareInputScan.h @@ -30,6 +30,8 @@ #define NODESHAREINPUTSCAN_H #include "nodes/execnodes.h" +#include "executor/nodeMaterial.h" + extern int ExecCountSlotsShareInputScan(ShareInputScan* node); extern ShareInputScanState *ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags); extern TupleTableSlot *ExecShareInputScan(ShareInputScanState *node); @@ -52,4 +54,5 @@ static inline gpmon_packet_t * GpmonPktFromShareInputState(ShareInputScanState * return &node->ss.ps.gpmon_pkt; } +extern void generate_lock_file_name(char* p, int size, int share_id, const char* name); #endif /* NODESHAREINPUTSCAN_H */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/270575d8/src/include/utils/guc.h ---------------------------------------------------------------------- diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 546584f..922fe0c 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -444,6 +444,8 @@ extern bool optimizer_prefer_scalar_dqa_multistage_agg; extern bool optimizer_parallel_union; extern bool optimizer_array_constraints; +/* Timeout for shareinputscan writer/reader wait for lock files */ +extern int share_input_scan_wait_lockfile_timeout; /* fallback in ranger ACL check */ extern int information_schema_namespace_oid;
