Revert "HAWQ-1342. Fixed QE process hang in shared input scan on segment node"

   The fix introduce hang regression in shared input scan query as described in 
HAWQ-1371


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/61780e99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/61780e99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/61780e99

Branch: refs/heads/master
Commit: 61780e999c508973e3dccd460e80f47853928277
Parents: 9d0ea46
Author: amyrazz44 <[email protected]>
Authored: Tue Mar 7 16:02:39 2017 +0800
Committer: Ruilong Huo <[email protected]>
Committed: Fri Mar 10 15:59:16 2017 +0800

----------------------------------------------------------------------
 src/backend/executor/nodeShareInputScan.c | 49 +++++++++++++++-----------
 1 file changed, 28 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/61780e99/src/backend/executor/nodeShareInputScan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeShareInputScan.c 
b/src/backend/executor/nodeShareInputScan.c
index 74dbcb5..0f08848 100644
--- a/src/backend/executor/nodeShareInputScan.c
+++ b/src/backend/executor/nodeShareInputScan.c
@@ -40,6 +40,7 @@
 
 #include "postgres.h"
 
+#include "access/xact.h"
 #include "cdb/cdbvars.h"
 #include "executor/executor.h"
 #include "executor/nodeShareInputScan.h"
@@ -640,6 +641,10 @@ read_retry:
                goto read_retry;
        else
        {
+               if(fd >= 0)
+               {
+                       gp_retry_close(fd);
+               }
                elog(ERROR, "could not read from fifo: %m");
        }
        Assert(!"Never be here");
@@ -659,6 +664,10 @@ write_retry:
                goto write_retry;
        else
        {
+               if(fd >= 0)
+               {
+                       gp_retry_close(fd);
+               }
                elog(ERROR, "could not write to fifo: %m");
        }
 
@@ -785,14 +794,7 @@ shareinput_reader_waitready(int share_id, PlanGenerator 
planGen)
                {
                        int save_errno = errno;
                        elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait 
ready try again, errno %d ... ",
-                                                               share_id, 
currentSliceId, save_errno);
-                       if(save_errno == EBADF)
-                       {
-                               /* The file description is invalid, maybe this 
FD has been already closed by writer in some cases
-                                * we need to break here to avoid endless loop 
and continue to run CHECK_FOR_INTERRUPTS.
-                                */
-                               break;
-                       }
+                                       share_id, currentSliceId, save_errno);
                }
        }
        return (void *) pctxt;
@@ -923,12 +925,9 @@ writer_wait_for_acks(ShareInput_Lk_Context *pctxt, int 
share_id, int xslice)
                        int save_errno = errno;
                        elog(LOG, "SISC WRITER (shareid=%d, slice=%d): notify 
still wait for an answer, errno %d",
                                        share_id, currentSliceId, save_errno);
-                       if(save_errno == EBADF)
-                       {
-                               /* The file description is invalid, maybe this 
FD has been already closed by writer in some cases
-                                * we need to break here to avoid endless loop 
and continue to run CHECK_FOR_INTERRUPTS.
-                                */
-                               break;
+                       /*if error(except EINTR) happens in select, we just 
return to avoid endless loop*/
+                       if(errno != EINTR){
+                               return;
                        }
                }
        }
@@ -980,6 +979,21 @@ shareinput_writer_waitdone(void *ctxt, int share_id, int 
nsharer_xslice)
        while(ack_needed > 0)
        {
                CHECK_FOR_INTERRUPTS();
+
+               /*
+                * Writer won't wait for data reading done notification from 
readers if transaction is
+                * aborting. Readers may fail to send data reading done 
notification to writer in two
+                * cases:
+                *
+                *    1. The transaction is aborted due to interrupts or 
exceptions, i.e., user cancels
+                *       query, division by zero on some segment
+                *
+                *    2. Logic errors in reader which incur its unexpected 
exit, i.e., segmentation fault
+                */
+               if (IsAbortInProgress())
+               {
+                       break;
+               }
        
                MPP_FD_ZERO(&rset);
                MPP_FD_SET(pctxt->donefd, &rset);
@@ -1010,13 +1024,6 @@ shareinput_writer_waitdone(void *ctxt, int share_id, int 
nsharer_xslice)
                        int save_errno = errno;
                        elog(LOG, "SISC WRITER (shareid=%d, slice=%d): wait 
done time out once, errno %d",
                                        share_id, currentSliceId, save_errno);
-                       if(save_errno == EBADF)
-                       {
-                               /* The file description is invalid, maybe this 
FD has been already closed by writer in some cases
-                                * we need to break here to avoid endless loop 
and continue to run CHECK_FOR_INTERRUPTS.
-                                */
-                               break;
-                       }
                }
        }
 

Reply via email to