Repository: incubator-hawq
Updated Branches:
  refs/heads/master 4ef7022e7 -> 339806f3a


fix share input scan bug for writer part


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/339806f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/339806f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/339806f3

Branch: refs/heads/master
Commit: 339806f3a40cf85686496412984e65ebfb481dbd
Parents: 4ef7022
Author: amyrazz44 <[email protected]>
Authored: Mon May 8 17:27:07 2017 +0800
Committer: Ruilong Huo <[email protected]>
Committed: Thu Jun 22 07:32:19 2017 +0800

----------------------------------------------------------------------
 src/backend/executor/nodeMaterial.c       |  38 +++++-
 src/backend/executor/nodeShareInputScan.c | 158 ++++++++++++++++++++++++-
 src/backend/utils/misc/guc.c              |  12 +-
 src/include/executor/nodeMaterial.h       |   1 +
 src/include/executor/nodeShareInputScan.h |   2 +
 src/include/utils/guc.h                   |   2 +
 6 files changed, 203 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/backend/executor/nodeMaterial.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeMaterial.c 
b/src/backend/executor/nodeMaterial.c
index f2b82b2..4589351 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -41,19 +41,21 @@
 #include "postgres.h"
 
 #include "executor/executor.h"
-#include "executor/nodeMaterial.h"
 #include "executor/instrument.h"        /* Instrumentation */
 #include "utils/tuplestorenew.h"
-
+#include "executor/nodeMaterial.h"
 #include "miscadmin.h"
 
 #include "cdb/cdbvars.h"
+#include "postmaster/primary_mirror_mode.h"
 
+
+static int sisc_writer_lock_fd = -1;
 static void ExecMaterialExplainEnd(PlanState *planstate, struct StringInfoData 
*buf);
 static void ExecChildRescan(MaterialState *node, ExprContext *exprCtxt);
 static void DestroyTupleStore(MaterialState *node);
 static void ExecMaterialResetWorkfileState(MaterialState *node);
-
+static void mkLockFileForWriter(int size, int share_id, char * name);
 
 /* ----------------------------------------------------------------
  *             ExecMaterial
@@ -115,6 +117,7 @@ ExecMaterial(MaterialState *node)
 
                        ts = ntuplestore_create_readerwriter(rwfile_prefix, 
PlanStateOperatorMemKB((PlanState *)node) * 1024, true);
                        tsa = ntuplestore_create_accessor(ts, true);
+                       mkLockFileForWriter(MAXPGPATH, ma->share_id, "writer");
                }
                else
                {
@@ -247,6 +250,8 @@ ExecMaterial(MaterialState *node)
 
                                        node->share_lk_ctxt = 
shareinput_writer_notifyready(ma->share_id, ma->nsharer_xslice,
                                                        
estate->es_plannedstmt->planGen);
+                                       if(sisc_writer_lock_fd > 0)
+                                               close(sisc_writer_lock_fd);
                                }
                        }
                        return NULL;
@@ -759,3 +764,30 @@ ExecEagerFreeMaterial(MaterialState *node)
        }
 }
 
+
+/*
+ * mkLockFileForWriter
+ * 
+ * Create a unique lock file for writer, then use flock() to lock/unlock the 
lock file.
+ * We can make sure the lock file will be locked forerver until the writer 
process quits.
+ */
+static void mkLockFileForWriter(int size, int share_id, char * name)
+{
+       char *lock_file;
+       int lock;
+
+       lock_file = (char *)palloc0(size);
+       generate_lock_file_name(lock_file, size, share_id, name);
+       elog(DEBUG3, "The lock file for writer in SISC is %s", lock_file);
+       sisc_writer_lock_fd = open(lock_file, O_CREAT, S_IRWXU);
+       if(sisc_writer_lock_fd < 0)
+       {
+               elog(ERROR, "Could not create lock file %s for writer in SISC. 
The error number is %d", lock_file, errno);
+       }
+       lock = flock(sisc_writer_lock_fd, LOCK_EX | LOCK_NB);
+       if(lock == -1)
+               elog(DEBUG3, "Could not lock lock file  \"%s\" for writer in 
SISC . The error number is %d", lock_file, errno);
+       else if(lock == 0)
+               elog(LOG, "Successfully locked lock file  \"%s\" for writer in 
SISC.", lock_file);
+       pfree(lock_file);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/backend/executor/nodeShareInputScan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeShareInputScan.c 
b/src/backend/executor/nodeShareInputScan.c
index 049943b..88c695d 100644
--- a/src/backend/executor/nodeShareInputScan.c
+++ b/src/backend/executor/nodeShareInputScan.c
@@ -43,7 +43,6 @@
 #include "cdb/cdbvars.h"
 #include "executor/executor.h"
 #include "executor/nodeShareInputScan.h"
-
 #include "utils/tuplestorenew.h"
 #include "miscadmin.h"
 
@@ -552,11 +551,59 @@ static void sisc_lockname(char* p, int size, int 
share_id, const char* name)
        }
 }
 
+
+char *joint_lock_file_name(ShareInput_Lk_Context *lk_ctxt, char *name)
+{
+       char *lock_file = palloc0(MAXPGPATH);
+
+       if(strncmp("writer", name, strlen("writer")) ==0 )
+       {
+               strncat(lock_file, lk_ctxt->lkname_ready, MAXPGPATH - 
strlen(lock_file) - 1);
+       }
+       else
+       {
+               strncat(lock_file, lk_ctxt->lkname_done, MAXPGPATH - 
strlen(lock_file) - 1);
+       }
+       strncat(lock_file, name, MAXPGPATH - strlen(lock_file) -1);
+       return lock_file;       
+}
+
+void drop_lock_files(ShareInput_Lk_Context *lk_ctxt)
+{
+       char *writer_lock_file = NULL;
+       char *reader_lock_file = NULL;
+
+       writer_lock_file = joint_lock_file_name(lk_ctxt, "writer");
+       if(access(writer_lock_file, F_OK) == 0)
+       {
+               elog(DEBUG3, "Drop writer's lock files %s in SISC", 
writer_lock_file);
+               unlink(writer_lock_file);
+       }
+       else
+       {
+               elog(DEBUG3, "Writer's lock files %s has been dropped already 
in SISC", writer_lock_file);
+       }
+       pfree(writer_lock_file);
+       reader_lock_file = joint_lock_file_name(lk_ctxt, "reader");     
+       if(access(reader_lock_file, F_OK) == 0)
+       {
+               elog(DEBUG3, "Drop reader's lock files %s in SISC", 
reader_lock_file);
+               unlink(writer_lock_file);
+       }
+       else
+       {
+               elog(DEBUG3, "Reader's lock files %s has been dropped already 
in SISC", reader_lock_file);
+       }
+       pfree(reader_lock_file);
+       
+}
+
 static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt)
 {
        int err;
 
        elog(DEBUG1, "shareinput_clean_lk_ctxt cleanup lk ctxt %p", lk_ctxt);
+       
 
        if(lk_ctxt->readyfd >= 0)
        {
@@ -590,6 +637,10 @@ static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context 
*lk_ctxt)
                lk_ctxt->del_done = false;
        }
 
+       elog(DEBUG3, "Begin to drop all the lock files for SISC");
+       drop_lock_files(lk_ctxt);
+       elog(DEBUG3, "End of drop lock files for SISC");
+
        gp_free2 (lk_ctxt, sizeof(ShareInput_Lk_Context));
 }
 
@@ -666,6 +717,29 @@ write_retry:
        return 0;
 }
 
+
+
+/*
+ * generate_lock_file_name
+ *
+ * Called by reader or writer to make the unique lock file name.
+ */
+void generate_lock_file_name(char* p, int size, int share_id, const char* name)
+{
+       if (strncmp(name , "writer", strlen("writer")) == 0)
+       {
+               sisc_lockname(p, size, share_id, "ready");
+               strncat(p, name, size - strlen(p) - 1);
+       }
+       else
+       {
+               sisc_lockname(p, size, share_id, "done");
+               strncat(p, name, size - strlen(p) - 1);
+       }
+}
+
+
+
 /* 
  * Readiness (a) synchronization.
  *
@@ -709,6 +783,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator 
planGen)
        struct timeval tval;
        int n;
        char a;
+       int file_exists = -1;
+       int timeout_interval = 0;
+       bool flag = false; //A tag for file exists or not.
+       int lock_fd = -1;
+       int lock = -1;
+       bool is_lock_firsttime = true;
+       char *writer_lock_file = NULL; //current path for lock file.
 
        ShareInput_Lk_Context *pctxt = gp_malloc(sizeof(ShareInput_Lk_Context));
 
@@ -738,6 +819,9 @@ shareinput_reader_waitready(int share_id, PlanGenerator 
planGen)
        if(pctxt->donefd < 0)
                elog(ERROR, "could not open fifo \"%s\": %m", 
pctxt->lkname_done);
 
+       writer_lock_file = joint_lock_file_name(pctxt, "writer");
+       elog(DEBUG3, "The lock file of writer in SISC is %s", writer_lock_file);
+
        while(1)
        {
                CHECK_FOR_INTERRUPTS();
@@ -773,13 +857,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator 
planGen)
                        retry_read(pctxt->readyfd, &a, 1);
                        Assert(rwsize == 1 && a == 'a');
 
-                       elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait 
ready got writer's handshake",
+                       elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait 
ready got writer's handshake",
                                        share_id, currentSliceId);
 
                        if (planGen == PLANGEN_PLANNER)
                        {
                                /* For planner-generated plans, we send ack 
back after receiving the handshake */
-                               elog(DEBUG1, "SISC READER (shareid=%d, 
slice=%d): Wait ready writing ack back to writer",
+                               elog(LOG, "SISC READER (shareid=%d, slice=%d): 
Wait ready writing ack back to writer",
                                                share_id, currentSliceId);
 
 #if USE_ASSERT_CHECKING
@@ -793,8 +877,70 @@ shareinput_reader_waitready(int share_id, PlanGenerator 
planGen)
                }
                else if(n==0)
                {
-                       elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait 
ready time out once",
-                                       share_id, currentSliceId);
+                       file_exists = access(writer_lock_file, F_OK);   
+                       if(file_exists != 0)
+                       {
+                               elog(DEBUG3, "Wait lock file for writer time 
out interval is %d", timeout_interval);
+                               if(timeout_interval >= 
share_input_scan_wait_lockfile_timeout || flag == true) //If lock file never 
exists or disappeared, reader will no longer waiting for writer
+                               {
+                                       elog(LOG, "SISC READER (shareid=%d, 
slice=%d): Wait ready time out and break",
+                                               share_id, currentSliceId);
+                                       pfree(writer_lock_file);
+                                       break;
+                               }
+                               timeout_interval += tval.tv_sec * 1000 + 
tval.tv_usec;
+                       }
+                       else
+                       {
+                               elog(LOG, "writer lock file of 
shareinput_reader_waitready() is %s", writer_lock_file);
+                               flag = true;
+                               lock_fd = open(writer_lock_file, O_RDONLY);
+                               if(lock_fd < 0)
+                               {
+                                       elog(DEBUG3, "Open writer's lock file 
%s failed!, error number is %d", writer_lock_file, errno);
+                                       continue;
+                               }
+                               lock = flock(lock_fd, LOCK_EX | LOCK_NB);
+                               if(lock == -1)
+                               {
+                                       /*
+                                        * Reader try to lock the lock file 
which writer created until locked the lock file successfully 
+                     * which means that writer process quit. If reader lock 
the lock file failed, it means that writer
+                                        * process is healthy.
+                                        */
+                                       elog(DEBUG3, "Lock writer's lock file 
%s failed!, error number is %d", writer_lock_file, errno);
+                               }
+                               else if(lock == 0)
+                               {
+                                       /*
+                                        * There is one situation to consider 
about.
+                                        * Writer need a time interval to lock 
the lock file after the lock file has been created.
+                                        * So, if reader lock the lock file 
ahead of writer, we should unlock it.
+                                        * If reader lock the lock file after 
writer, it means that writer process has abort.
+                                        * We should break the loop to make 
sure reader no longer wait for writer.
+                                        */  
+                                       if(is_lock_firsttime == true)  
+                                       {
+                                               lock = flock(lock_fd, LOCK_UN); 
+                                               is_lock_firsttime = false;
+                                               elog(DEBUG3, "Lock writer's 
lock file %s first time successfully in SISC! Unlock it.", writer_lock_file);
+                                               continue;
+                                       }
+                                       else
+                                       {
+                                               elog(LOG, "Lock writer's lock 
file %s successfully in SISC!", writer_lock_file);
+                                               /* Retry to close the fd in 
case there is interruption from signal */
+                                               while ((close(lock_fd) < 0) && 
(errno == EINTR))
+                                               {
+                                                       elog(DEBUG3, "Failed to 
close SISC temporary file due to strerror(errno)");
+                                               }
+                                               pfree(writer_lock_file);
+                                               break; 
+                                       }
+                               }
+                               elog(DEBUG1, "SISC READER (shareid=%d, 
slice=%d): Wait ready time out once",
+                                               share_id, currentSliceId);
+                       }
                }
                else
                {
@@ -802,6 +948,7 @@ shareinput_reader_waitready(int share_id, PlanGenerator 
planGen)
                        elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait 
ready try again, errno %d ... ",
                                        share_id, currentSliceId, save_errno);
                }
+
        }
        return (void *) pctxt;
 }
@@ -853,7 +1000,6 @@ shareinput_writer_notifyready(int share_id, int xslice, 
PlanGenerator planGen)
        pctxt->donefd = open(pctxt->lkname_done, O_RDWR, 0600);
        if(pctxt->donefd < 0)
                elog(ERROR, "could not open fifo \"%s\": %m", 
pctxt->lkname_done);
-
        for(n=0; n<xslice; ++n)
        {
 #if USE_ASSERT_CHECKING

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6769d3b..87f44c2 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -668,6 +668,8 @@ bool                gp_cte_sharing = false;
 
 char      *gp_idf_deduplicate_str;
 
+int share_input_scan_wait_lockfile_timeout;
+
 /* gp_disable_catalog_access_on_segment */
 bool gp_disable_catalog_access_on_segment = false;
 
@@ -6685,6 +6687,15 @@ static struct config_int ConfigureNamesInt[] =
                &metadata_cache_max_hdfs_file_num,
                524288, 32768, 8388608, NULL, NULL
        },
+       {
+               {"share_input_scan_wait_lockfile_timeout", PGC_USERSET, 
DEVELOPER_OPTIONS,
+                       gettext_noop("timeout (in millisecond) for waiting lock 
file which writer creates."),
+                       NULL
+               },
+               &share_input_scan_wait_lockfile_timeout,
+               300000, 1, 65536, NULL, NULL
+       },
+
 
        /* End-of-list marker */
        {
@@ -8349,7 +8360,6 @@ static struct config_string ConfigureNamesString[] =
                NULL, NULL, NULL
        },
 
-
        /* End-of-list marker */
        {
                {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/include/executor/nodeMaterial.h
----------------------------------------------------------------------
diff --git a/src/include/executor/nodeMaterial.h 
b/src/include/executor/nodeMaterial.h
index 509f78c..41eb6f7 100644
--- a/src/include/executor/nodeMaterial.h
+++ b/src/include/executor/nodeMaterial.h
@@ -15,6 +15,7 @@
 #define NODEMATERIAL_H
 
 #include "nodes/execnodes.h"
+#include "executor/nodeShareInputScan.h"
 
 extern int     ExecCountSlotsMaterial(Material *node);
 extern MaterialState *ExecInitMaterial(Material *node, EState *estate, int 
eflags);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/include/executor/nodeShareInputScan.h
----------------------------------------------------------------------
diff --git a/src/include/executor/nodeShareInputScan.h 
b/src/include/executor/nodeShareInputScan.h
index 23025b9..730ffe1 100644
--- a/src/include/executor/nodeShareInputScan.h
+++ b/src/include/executor/nodeShareInputScan.h
@@ -30,6 +30,7 @@
 #define NODESHAREINPUTSCAN_H
 
 #include "nodes/execnodes.h"
+
 extern int ExecCountSlotsShareInputScan(ShareInputScan* node);
 extern ShareInputScanState *ExecInitShareInputScan(ShareInputScan *node, 
EState *estate, int eflags);
 extern TupleTableSlot *ExecShareInputScan(ShareInputScanState *node);
@@ -52,4 +53,5 @@ static inline gpmon_packet_t * 
GpmonPktFromShareInputState(ShareInputScanState *
        return &node->ss.ps.gpmon_pkt;
 }
 
+extern void generate_lock_file_name(char* p, int size, int share_id, const 
char* name);
 #endif   /* NODESHAREINPUTSCAN_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/include/utils/guc.h
----------------------------------------------------------------------
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 546584f..530abe5 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -444,6 +444,8 @@ extern bool optimizer_prefer_scalar_dqa_multistage_agg;
 extern bool optimizer_parallel_union;
 extern bool optimizer_array_constraints;
 
+/* Timeout for shareinputscan writer/reader wait for lock files */ 
+extern int share_input_scan_wait_lockfile_timeout;
 
 /* fallback in ranger ACL check */
 extern int information_schema_namespace_oid;

Reply via email to