This is an automated email from the ASF dual-hosted git repository. maxyang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
commit cdf9eb90fb36074f189014fe50ece3f7ef539596 Author: Juyi.lmz <[email protected]> AuthorDate: Thu Apr 11 15:07:05 2024 +0800 Fix token for user id xxx doesn't exist (#17325) Merge the function alloc_endpoint() and setup_endpoint_token_entry() to ensure they are in one atomic operation. As a result, functions alloc_endpoint() and free_endpoint() become opposite operations. --- src/backend/cdb/endpoint/cdbendpoint.c | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/backend/cdb/endpoint/cdbendpoint.c b/src/backend/cdb/endpoint/cdbendpoint.c index ea12790a20..d5a537a198 100644 --- a/src/backend/cdb/endpoint/cdbendpoint.c +++ b/src/backend/cdb/endpoint/cdbendpoint.c @@ -309,7 +309,6 @@ SetupEndpointExecState(TupleDesc tupleDesc, const char *cursorName, */ CurrentEndpointExecState->endpoint = alloc_endpoint(cursorName, dsm_segment_handle(CurrentEndpointExecState->dsmSeg)); - setup_endpoint_token_entry(); CurrentEndpointExecState->dest = CreateTupleQueueDestReceiver(shmMqHandle); (CurrentEndpointExecState->dest->rStartup)(CurrentEndpointExecState->dest, operation, tupleDesc); @@ -471,6 +470,12 @@ static Endpoint OwnLatch(&sharedEndpoints[i].ackDone); ret = &sharedEndpoints[i]; + /* + * setup the token entry here to ensure that the 'sharedEndpoints' + * and 'EndpointTokenHash' stay synchronized. + */ + setup_endpoint_token_entry(); + LWLockRelease(ParallelCursorEndpointLock); return ret; } @@ -542,6 +547,8 @@ create_and_connect_mq(TupleDesc tupleDesc, dsm_segment **mqSeg /* out */ , /* * Create/reuse EndpointTokenEntry for current session in shared memory. * EndpointTokenEntry is used for authentication in the retrieve sessions. + * + * Needs to be called with exclusive lock on ParallelCursorEndpointLock. */ static void setup_endpoint_token_entry() @@ -554,7 +561,7 @@ setup_endpoint_token_entry() tag.sessionID = gp_session_id; tag.userID = GetUserId(); - LWLockAcquire(ParallelCursorEndpointLock, LW_EXCLUSIVE); + Assert(LWLockHeldByMeInMode(ParallelCursorEndpointLock, LW_EXCLUSIVE)); infoEntry = (EndpointTokenEntry *) hash_search(EndpointTokenHash, &tag, HASH_ENTER, &found); elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: Finish endpoint init. Found EndpointTokenEntry? %d", found); @@ -571,8 +578,6 @@ setup_endpoint_token_entry() infoEntry->refCount++; Assert(infoEntry->refCount <= MAX_ENDPOINT_SIZE); - - LWLockRelease(ParallelCursorEndpointLock); } /* @@ -691,6 +696,7 @@ detach_mq(dsm_segment *dsmSeg) * * Clean the Endpoint entry sender pid when endpoint finish it's * job or abort. + * * Needs to be called with exclusive lock on ParallelCursorEndpointLock. */ static void @@ -698,6 +704,7 @@ unset_endpoint_sender_pid(Endpoint *endpoint) { Assert(endpoint); Assert(!endpoint->empty); + Assert(LWLockHeldByMeInMode(ParallelCursorEndpointLock, LW_EXCLUSIVE)); elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: unset endpoint sender pid"); @@ -828,6 +835,7 @@ free_endpoint(Endpoint *endpoint) Assert(endpoint); Assert(!endpoint->empty); + Assert(LWLockHeldByMeInMode(ParallelCursorEndpointLock, LW_EXCLUSIVE)); elogif(gp_log_endpoints, LOG, "CDB_ENDPOINT: free endpoint '%s'", endpoint->name); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
