This is an automated email from the ASF dual-hosted git repository.

maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit cdf9eb90fb36074f189014fe50ece3f7ef539596
Author: Juyi.lmz <[email protected]>
AuthorDate: Thu Apr 11 15:07:05 2024 +0800

    Fix token for user id xxx doesn't exist (#17325)
    
    Merge the function alloc_endpoint() and setup_endpoint_token_entry() to
    ensure they are in one atomic operation. As a result, functions
    alloc_endpoint() and free_endpoint() become opposite operations.
---
 src/backend/cdb/endpoint/cdbendpoint.c | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/src/backend/cdb/endpoint/cdbendpoint.c 
b/src/backend/cdb/endpoint/cdbendpoint.c
index ea12790a20..d5a537a198 100644
--- a/src/backend/cdb/endpoint/cdbendpoint.c
+++ b/src/backend/cdb/endpoint/cdbendpoint.c
@@ -309,7 +309,6 @@ SetupEndpointExecState(TupleDesc tupleDesc, const char 
*cursorName,
         */
        CurrentEndpointExecState->endpoint =
                alloc_endpoint(cursorName, 
dsm_segment_handle(CurrentEndpointExecState->dsmSeg));
-       setup_endpoint_token_entry();
 
        CurrentEndpointExecState->dest = 
CreateTupleQueueDestReceiver(shmMqHandle);
        
(CurrentEndpointExecState->dest->rStartup)(CurrentEndpointExecState->dest, 
operation, tupleDesc);
@@ -471,6 +470,12 @@ static Endpoint
        OwnLatch(&sharedEndpoints[i].ackDone);
        ret = &sharedEndpoints[i];
 
+       /*
+        * setup the token entry here to ensure that the 'sharedEndpoints'
+        * and 'EndpointTokenHash' stay synchronized.
+        */
+       setup_endpoint_token_entry();
+
        LWLockRelease(ParallelCursorEndpointLock);
        return ret;
 }
@@ -542,6 +547,8 @@ create_and_connect_mq(TupleDesc tupleDesc, dsm_segment 
**mqSeg /* out */ ,
 /*
  * Create/reuse EndpointTokenEntry for current session in shared memory.
  * EndpointTokenEntry is used for authentication in the retrieve sessions.
+ *
+ * Needs to be called with exclusive lock on ParallelCursorEndpointLock.
  */
 static void
 setup_endpoint_token_entry()
@@ -554,7 +561,7 @@ setup_endpoint_token_entry()
        tag.sessionID = gp_session_id;
        tag.userID = GetUserId();
 
-       LWLockAcquire(ParallelCursorEndpointLock, LW_EXCLUSIVE);
+       Assert(LWLockHeldByMeInMode(ParallelCursorEndpointLock, LW_EXCLUSIVE));
        infoEntry = (EndpointTokenEntry *) hash_search(EndpointTokenHash, &tag, 
HASH_ENTER, &found);
        elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: Finish endpoint init. 
Found EndpointTokenEntry? %d", found);
 
@@ -571,8 +578,6 @@ setup_endpoint_token_entry()
 
        infoEntry->refCount++;
        Assert(infoEntry->refCount <= MAX_ENDPOINT_SIZE);
-
-       LWLockRelease(ParallelCursorEndpointLock);
 }
 
 /*
@@ -691,6 +696,7 @@ detach_mq(dsm_segment *dsmSeg)
  *
  * Clean the Endpoint entry sender pid when endpoint finish it's
  * job or abort.
+ *
  * Needs to be called with exclusive lock on ParallelCursorEndpointLock.
  */
 static void
@@ -698,6 +704,7 @@ unset_endpoint_sender_pid(Endpoint *endpoint)
 {
        Assert(endpoint);
        Assert(!endpoint->empty);
+       Assert(LWLockHeldByMeInMode(ParallelCursorEndpointLock, LW_EXCLUSIVE));
 
        elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: unset endpoint sender 
pid");
 
@@ -828,6 +835,7 @@ free_endpoint(Endpoint *endpoint)
 
        Assert(endpoint);
        Assert(!endpoint->empty);
+       Assert(LWLockHeldByMeInMode(ParallelCursorEndpointLock, LW_EXCLUSIVE));
 
        elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: free endpoint '%s'", 
endpoint->name);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to