This is an automated email from the ASF dual-hosted git repository. maxyang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push: new e6773293d3 Remove the HTBL in motion e6773293d3 is described below commit e6773293d37acf1c860c08419daec3c6bdfc4601 Author: zhoujiaqi <zhouji...@hashdata.cn> AuthorDate: Tue Mar 25 11:36:56 2025 +0800 Remove the HTBL in motion In the previous interconnect modular reconstruction, `MotionConn` was no longer exposed to the CBDB kernel method, and we add a HashTable used to cache `sent_record_typmod`. This change requires a `hash_search` operation every time `motion->sendtuple` is called. Current change adds the `GetMotionSentRecordTypmod` method so that motion no longer needs to obtain `sent_record_typmod` from HashTable. --- contrib/interconnect/ic_common.c | 29 +++++++++++++++++++---------- contrib/interconnect/ic_common.h | 5 +++++ contrib/interconnect/ic_internal.h | 9 +++++++++ contrib/interconnect/ic_modules.c | 3 +++ contrib/interconnect/tcp/ic_tcp.c | 10 ---------- contrib/interconnect/udp/ic_udpifc.c | 9 --------- src/backend/cdb/motion/cdbmotion.c | 29 ++++++----------------------- src/include/cdb/cdbinterconnect.h | 23 ----------------------- src/include/cdb/ml_ipc.h | 4 ++++ 9 files changed, 46 insertions(+), 75 deletions(-) diff --git a/contrib/interconnect/ic_common.c b/contrib/interconnect/ic_common.c index 0847a15ec0..d629eb0969 100644 --- a/contrib/interconnect/ic_common.c +++ b/contrib/interconnect/ic_common.c @@ -296,9 +296,6 @@ createChunkTransportState(ChunkTransportState * transportStates, for (i = 0; i < pEntry->numConns; i++) { MotionConn *conn = NULL; - MotionConnKey motion_conn_key; - MotionConnSentRecordTypmodEnt *motion_conn_ent; - getMotionConn(pEntry, i, &conn); /* Initialize MotionConn entry. */ @@ -310,13 +307,7 @@ createChunkTransportState(ChunkTransportState * transportStates, conn->stopRequested = false; conn->cdbProc = NULL; conn->remapper = NULL; - - motion_conn_key.mot_node_id = motNodeID; - motion_conn_key.conn_index = i; - - motion_conn_ent = (MotionConnSentRecordTypmodEnt *) hash_search(transportStates->conn_sent_record_typmod, - &motion_conn_key, HASH_ENTER, NULL); - motion_conn_ent->sent_record_typmod = 0; + conn->sent_record_typmod = 0; } return pEntry; @@ -544,6 +535,24 @@ GetMotionConnTupleRemapper(ChunkTransportState * transportStates, return conn->remapper; } + +int32 * +GetMotionSentRecordTypmod(ChunkTransportState * transportStates, + int16 motNodeID, + int16 targetRoute) +{ + MotionConn *conn; + ChunkTransportStateEntry *pEntry = NULL; + getChunkTransportState(transportStates, motNodeID, &pEntry); + + if (targetRoute == BROADCAST_SEGIDX) + conn = &pEntry->conns[0]; + else + conn = &pEntry->conns[targetRoute]; + + return &conn->sent_record_typmod; +} + /* * do nothing for tcp/proxy implement. */ diff --git a/contrib/interconnect/ic_common.h b/contrib/interconnect/ic_common.h index 14de6a09ce..b156a46fd6 100644 --- a/contrib/interconnect/ic_common.h +++ b/contrib/interconnect/ic_common.h @@ -137,6 +137,11 @@ extern TupleRemapper * GetMotionConnTupleRemapper(ChunkTransportState *transport int16 motNodeID, int16 targetRoute); +int32 * +GetMotionSentRecordTypmod(ChunkTransportState * transportStates, + int16 motNodeID, + int16 targetRoute); + extern void DirectPutRxBufferTCP(ChunkTransportState *transportStates, int motNodeID, int route); extern uint32 GetActiveMotionConnsTCP(void); #endif diff --git a/contrib/interconnect/ic_internal.h b/contrib/interconnect/ic_internal.h index 2e12634d38..77c08ee6e2 100644 --- a/contrib/interconnect/ic_internal.h +++ b/contrib/interconnect/ic_internal.h @@ -86,6 +86,15 @@ typedef struct MotionConn * be longer than about 50 chars, but * play it safe */ + /* + * used by the sender. + * + * the typmod of last sent record type in current connection, + * if the connection is for broadcasting then we only check + * and update this attribute on connection 0. + */ + int32 sent_record_typmod; + /* * used by the receiver. * diff --git a/contrib/interconnect/ic_modules.c b/contrib/interconnect/ic_modules.c index 1c30d5639e..b582e8bdbe 100644 --- a/contrib/interconnect/ic_modules.c +++ b/contrib/interconnect/ic_modules.c @@ -59,6 +59,7 @@ MotionIPCLayer tcp_ipc_layer = { #endif .GetMotionConnTupleRemapper = GetMotionConnTupleRemapper, + .GetMotionSentRecordTypmod = GetMotionSentRecordTypmod, }; MotionIPCLayer proxy_ipc_layer = { @@ -97,6 +98,7 @@ MotionIPCLayer proxy_ipc_layer = { #endif .GetMotionConnTupleRemapper = GetMotionConnTupleRemapper, + .GetMotionSentRecordTypmod = GetMotionSentRecordTypmod, }; @@ -136,6 +138,7 @@ MotionIPCLayer udpifc_ipc_layer = { #endif .GetMotionConnTupleRemapper = GetMotionConnTupleRemapper, + .GetMotionSentRecordTypmod = GetMotionSentRecordTypmod, }; void diff --git a/contrib/interconnect/tcp/ic_tcp.c b/contrib/interconnect/tcp/ic_tcp.c index 284241d05f..39ad2f9c4e 100644 --- a/contrib/interconnect/tcp/ic_tcp.c +++ b/contrib/interconnect/tcp/ic_tcp.c @@ -1283,7 +1283,6 @@ SetupTCPInterconnect(EState *estate) /* we can have at most one of these. */ ChunkTransportStateEntry *sendingChunkTransportState = NULL; ChunkTransportState *interconnect_context; - HASHCTL conn_sent_record_typmod_ctl; SIMPLE_FAULT_INJECTOR("interconnect_setup_palloc"); interconnect_context = palloc0(sizeof(ChunkTransportState)); @@ -1300,13 +1299,6 @@ SetupTCPInterconnect(EState *estate) interconnect_context->sliceTable = copyObject(sliceTable); interconnect_context->sliceId = sliceTable->localSlice; - conn_sent_record_typmod_ctl.keysize = sizeof(MotionConnKey); - conn_sent_record_typmod_ctl.entrysize = sizeof(MotionConnSentRecordTypmodEnt); - conn_sent_record_typmod_ctl.hcxt = CurrentMemoryContext; - - interconnect_context->conn_sent_record_typmod = hash_create( - "MotionConn sent record typmod mapping", 128, &conn_sent_record_typmod_ctl, HASH_CONTEXT | HASH_ELEM | HASH_BLOBS); - #ifdef ENABLE_IC_PROXY ic_proxy_backend_init_context(interconnect_context); #endif /* ENABLE_IC_PROXY */ @@ -2177,8 +2169,6 @@ TeardownTCPInterconnect(ChunkTransportState * transportStates, bool hasErrors) if (transportStates->states != NULL) pfree(transportStates->states); - if (transportStates->conn_sent_record_typmod) - hash_destroy(transportStates->conn_sent_record_typmod); pfree(transportStates); diff --git a/contrib/interconnect/udp/ic_udpifc.c b/contrib/interconnect/udp/ic_udpifc.c index d2fcd8cb45..63e8c9301d 100644 --- a/contrib/interconnect/udp/ic_udpifc.c +++ b/contrib/interconnect/udp/ic_udpifc.c @@ -3066,7 +3066,6 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable) ChunkTransportStateEntry *sendingChunkTransportState = NULL; ChunkTransportState *interconnect_context; - HASHCTL conn_sent_record_typmod_ctl; pthread_mutex_lock(&ic_control_info.lock); @@ -3089,10 +3088,6 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable) ic_control_info.ic_instance_id = sliceTable->ic_instance_id; } - conn_sent_record_typmod_ctl.keysize = sizeof(MotionConnKey); - conn_sent_record_typmod_ctl.entrysize = sizeof(MotionConnSentRecordTypmodEnt); - conn_sent_record_typmod_ctl.hcxt = CurrentMemoryContext; - interconnect_context = palloc0(sizeof(ChunkTransportState)); /* initialize state variables */ @@ -3106,8 +3101,6 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable) interconnect_context->incompleteConns = NIL; interconnect_context->sliceTable = copyObject(sliceTable); interconnect_context->sliceId = sliceTable->localSlice; - interconnect_context->conn_sent_record_typmod = hash_create( - "MotionConn sent record typmod mapping", 128, &conn_sent_record_typmod_ctl, HASH_CONTEXT | HASH_ELEM | HASH_BLOBS); mySlice = &interconnect_context->sliceTable->slices[sliceTable->localSlice]; @@ -3746,8 +3739,6 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState *transportStates, pfree(transportStates->states); transportStates->states = NULL; } - if (transportStates->conn_sent_record_typmod) - hash_destroy(transportStates->conn_sent_record_typmod); pfree(transportStates); } diff --git a/src/backend/cdb/motion/cdbmotion.c b/src/backend/cdb/motion/cdbmotion.c index ff6955282a..24b112cd1b 100644 --- a/src/backend/cdb/motion/cdbmotion.c +++ b/src/backend/cdb/motion/cdbmotion.c @@ -359,27 +359,10 @@ CheckAndSendRecordCache(MotionLayerState *mlStates, MotionNodeEntry *pMNEntry; TupleChunkListData tcList; MemoryContext oldCtxt; - bool sent_record_typmod_found = false; - MotionConnKey motion_conn_key; - MotionConnSentRecordTypmodEnt *motion_conn_ent; + int32 *conn_sent_record_typmod; - /* - * for broadcast we only mark sent_record_typmod for connection 0 for - * efficiency and convenience - */ - motion_conn_key.mot_node_id = motNodeID; - motion_conn_key.conn_index = targetRoute == BROADCAST_SEGIDX ? 0 : targetRoute; - - motion_conn_ent = (MotionConnSentRecordTypmodEnt *)hash_search(transportStates->conn_sent_record_typmod, - &motion_conn_key, HASH_FIND, &sent_record_typmod_found); - - if (!sent_record_typmod_found) { - ereport(ERROR, - (errcode(ERRCODE_GP_INTERCONNECTION_ERROR), - errmsg("interconnect error: Unexpected Motion Node Id: %d, targetRoute: %d", - motNodeID, targetRoute), - errdetail("Fail to get sent_record_typmod from motion conntion"))); - } + conn_sent_record_typmod = CurrentMotionIPCLayer->GetMotionSentRecordTypmod(transportStates, motNodeID, targetRoute); + Assert(conn_sent_record_typmod); /* * Analyze tools. Do not send any thing if this slice is in the bit mask @@ -394,7 +377,7 @@ CheckAndSendRecordCache(MotionLayerState *mlStates, */ pMNEntry = getMotionNodeEntry(mlStates, motNodeID); - if (!ShouldSendRecordCache(motion_conn_ent->sent_record_typmod, &pMNEntry->ser_tup_info)) + if (!ShouldSendRecordCache(*conn_sent_record_typmod, &pMNEntry->ser_tup_info)) return; #ifdef AMS_VERBOSE_LOGGING @@ -404,7 +387,7 @@ CheckAndSendRecordCache(MotionLayerState *mlStates, /* Create and store the serialized form, and some stats about it. */ oldCtxt = MemoryContextSwitchTo(mlStates->motion_layer_mctx); - SerializeRecordCacheIntoChunks(&pMNEntry->ser_tup_info, &tcList, motion_conn_ent->sent_record_typmod); + SerializeRecordCacheIntoChunks(&pMNEntry->ser_tup_info, &tcList, *conn_sent_record_typmod); MemoryContextSwitchTo(oldCtxt); @@ -432,7 +415,7 @@ CheckAndSendRecordCache(MotionLayerState *mlStates, /* cleanup */ clearTCList(&pMNEntry->ser_tup_info.chunkCache, &tcList); - UpdateSentRecordCache(&motion_conn_ent->sent_record_typmod); + UpdateSentRecordCache(conn_sent_record_typmod); } /* diff --git a/src/include/cdb/cdbinterconnect.h b/src/include/cdb/cdbinterconnect.h index eda5c59463..9f5ce7753d 100644 --- a/src/include/cdb/cdbinterconnect.h +++ b/src/include/cdb/cdbinterconnect.h @@ -178,18 +178,6 @@ struct ChunkTransportStateEntry; typedef struct MotionConn MotionConn; typedef struct ChunkTransportStateEntry ChunkTransportStateEntry; -typedef struct MotionConnKey -{ - int mot_node_id; - int conn_index; -} MotionConnKey; - -typedef struct MotionConnSentRecordTypmodEnt -{ - MotionConnKey key; - int32 sent_record_typmod; -} MotionConnSentRecordTypmodEnt; - typedef struct ChunkTransportState { /* array of per-motion-node chunk transport state @@ -223,17 +211,6 @@ typedef struct ChunkTransportState /* Estate pointer for this statement */ struct EState *estate; - /* - * used by the sender. - * - * the typmod of last sent record type in current connection, - * if the connection is for broadcasting then we only check - * and update this attribute on connection 0. - * - * mapping the MotionConn -> int32 - */ - HTAB* conn_sent_record_typmod; - /* ic_proxy backend context */ struct ICProxyBackendContext *proxyContext; } ChunkTransportState; diff --git a/src/include/cdb/ml_ipc.h b/src/include/cdb/ml_ipc.h index a786a1c173..b39eaac60a 100644 --- a/src/include/cdb/ml_ipc.h +++ b/src/include/cdb/ml_ipc.h @@ -285,6 +285,10 @@ typedef struct MotionIPCLayer int16 motNodeID, int16 targetRoute); + int32 *(*GetMotionSentRecordTypmod) (ChunkTransportState *transportStates, + int16 motNodeID, + int16 targetRoute); + } MotionIPCLayer; /* MotionIPCLayer selected */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org For additional commands, e-mail: commits-h...@cloudberry.apache.org