This is an automated email from the ASF dual-hosted git repository.

maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git


The following commit(s) were added to refs/heads/main by this push:
     new e6773293d3 Remove the HTBL in motion
e6773293d3 is described below

commit e6773293d37acf1c860c08419daec3c6bdfc4601
Author: zhoujiaqi <zhouji...@hashdata.cn>
AuthorDate: Tue Mar 25 11:36:56 2025 +0800

    Remove the HTBL in motion
    
    In the previous interconnect modular reconstruction, `MotionConn`
    was no longer exposed to the CBDB kernel method, and we add a
    HashTable used to cache `sent_record_typmod`. This change requires
    a `hash_search` operation every time `motion->sendtuple` is called.
    
    Current change adds the `GetMotionSentRecordTypmod` method so that
    motion no longer needs to obtain `sent_record_typmod` from HashTable.
---
 contrib/interconnect/ic_common.c     | 29 +++++++++++++++++++----------
 contrib/interconnect/ic_common.h     |  5 +++++
 contrib/interconnect/ic_internal.h   |  9 +++++++++
 contrib/interconnect/ic_modules.c    |  3 +++
 contrib/interconnect/tcp/ic_tcp.c    | 10 ----------
 contrib/interconnect/udp/ic_udpifc.c |  9 ---------
 src/backend/cdb/motion/cdbmotion.c   | 29 ++++++-----------------------
 src/include/cdb/cdbinterconnect.h    | 23 -----------------------
 src/include/cdb/ml_ipc.h             |  4 ++++
 9 files changed, 46 insertions(+), 75 deletions(-)

diff --git a/contrib/interconnect/ic_common.c b/contrib/interconnect/ic_common.c
index 0847a15ec0..d629eb0969 100644
--- a/contrib/interconnect/ic_common.c
+++ b/contrib/interconnect/ic_common.c
@@ -296,9 +296,6 @@ createChunkTransportState(ChunkTransportState * 
transportStates,
        for (i = 0; i < pEntry->numConns; i++)
        {
                MotionConn *conn = NULL;
-               MotionConnKey motion_conn_key;
-               MotionConnSentRecordTypmodEnt *motion_conn_ent;
-
                getMotionConn(pEntry, i, &conn);
 
                /* Initialize MotionConn entry. */
@@ -310,13 +307,7 @@ createChunkTransportState(ChunkTransportState * 
transportStates,
                conn->stopRequested = false;
                conn->cdbProc = NULL;
                conn->remapper = NULL;
-
-               motion_conn_key.mot_node_id = motNodeID;
-               motion_conn_key.conn_index = i;
-
-               motion_conn_ent = (MotionConnSentRecordTypmodEnt *) 
hash_search(transportStates->conn_sent_record_typmod,
-                                                                               
                                                                
&motion_conn_key, HASH_ENTER, NULL);
-               motion_conn_ent->sent_record_typmod = 0;
+               conn->sent_record_typmod = 0;
        }
 
        return pEntry;
@@ -544,6 +535,24 @@ GetMotionConnTupleRemapper(ChunkTransportState * 
transportStates,
        return conn->remapper;
 }
 
+
+int32 *
+GetMotionSentRecordTypmod(ChunkTransportState * transportStates,
+                                                  int16 motNodeID,
+                                                  int16 targetRoute)
+{
+       MotionConn *conn;
+       ChunkTransportStateEntry *pEntry = NULL;
+       getChunkTransportState(transportStates, motNodeID, &pEntry);
+
+       if (targetRoute == BROADCAST_SEGIDX)
+               conn = &pEntry->conns[0];
+       else
+               conn = &pEntry->conns[targetRoute];
+
+       return &conn->sent_record_typmod;
+}
+
 /*
  * do nothing for tcp/proxy implement.
  */
diff --git a/contrib/interconnect/ic_common.h b/contrib/interconnect/ic_common.h
index 14de6a09ce..b156a46fd6 100644
--- a/contrib/interconnect/ic_common.h
+++ b/contrib/interconnect/ic_common.h
@@ -137,6 +137,11 @@ extern TupleRemapper * 
GetMotionConnTupleRemapper(ChunkTransportState *transport
                int16 motNodeID,
                int16 targetRoute);
 
+int32 *
+GetMotionSentRecordTypmod(ChunkTransportState * transportStates,
+                                                                  int16 
motNodeID,
+                                                                  int16 
targetRoute);
+
 extern void DirectPutRxBufferTCP(ChunkTransportState *transportStates, int 
motNodeID, int route);
 extern uint32 GetActiveMotionConnsTCP(void);
 #endif
diff --git a/contrib/interconnect/ic_internal.h 
b/contrib/interconnect/ic_internal.h
index 2e12634d38..77c08ee6e2 100644
--- a/contrib/interconnect/ic_internal.h
+++ b/contrib/interconnect/ic_internal.h
@@ -86,6 +86,15 @@ typedef struct MotionConn
                                                                                
 * be longer than about 50 chars, but
                                                                                
 * play it safe */
 
+       /*
+        * used by the sender.
+        *
+        * the typmod of last sent record type in current connection,
+        * if the connection is for broadcasting then we only check
+        * and update this attribute on connection 0.
+        */
+       int32            sent_record_typmod;
+
        /*
         * used by the receiver.
         *
diff --git a/contrib/interconnect/ic_modules.c 
b/contrib/interconnect/ic_modules.c
index 1c30d5639e..b582e8bdbe 100644
--- a/contrib/interconnect/ic_modules.c
+++ b/contrib/interconnect/ic_modules.c
@@ -59,6 +59,7 @@ MotionIPCLayer tcp_ipc_layer = {
 #endif
 
     .GetMotionConnTupleRemapper = GetMotionConnTupleRemapper,
+    .GetMotionSentRecordTypmod = GetMotionSentRecordTypmod,
 };
 
 MotionIPCLayer proxy_ipc_layer = {
@@ -97,6 +98,7 @@ MotionIPCLayer proxy_ipc_layer = {
 #endif
 
     .GetMotionConnTupleRemapper = GetMotionConnTupleRemapper,
+    .GetMotionSentRecordTypmod = GetMotionSentRecordTypmod,
 };
 
 
@@ -136,6 +138,7 @@ MotionIPCLayer udpifc_ipc_layer = {
 #endif
 
     .GetMotionConnTupleRemapper = GetMotionConnTupleRemapper,
+    .GetMotionSentRecordTypmod = GetMotionSentRecordTypmod,
 };
 
 void
diff --git a/contrib/interconnect/tcp/ic_tcp.c 
b/contrib/interconnect/tcp/ic_tcp.c
index 284241d05f..39ad2f9c4e 100644
--- a/contrib/interconnect/tcp/ic_tcp.c
+++ b/contrib/interconnect/tcp/ic_tcp.c
@@ -1283,7 +1283,6 @@ SetupTCPInterconnect(EState *estate)
        /* we can have at most one of these. */
        ChunkTransportStateEntry *sendingChunkTransportState = NULL;
        ChunkTransportState *interconnect_context;
-       HASHCTL         conn_sent_record_typmod_ctl;
 
        SIMPLE_FAULT_INJECTOR("interconnect_setup_palloc");
        interconnect_context = palloc0(sizeof(ChunkTransportState));
@@ -1300,13 +1299,6 @@ SetupTCPInterconnect(EState *estate)
        interconnect_context->sliceTable = copyObject(sliceTable);
        interconnect_context->sliceId = sliceTable->localSlice;
 
-       conn_sent_record_typmod_ctl.keysize = sizeof(MotionConnKey);
-       conn_sent_record_typmod_ctl.entrysize = 
sizeof(MotionConnSentRecordTypmodEnt);
-       conn_sent_record_typmod_ctl.hcxt = CurrentMemoryContext;
-
-       interconnect_context->conn_sent_record_typmod = hash_create(
-                                                                               
                                                "MotionConn sent record typmod 
mapping", 128, &conn_sent_record_typmod_ctl, HASH_CONTEXT | HASH_ELEM | 
HASH_BLOBS);
-
 #ifdef ENABLE_IC_PROXY
        ic_proxy_backend_init_context(interconnect_context);
 #endif                                                 /* ENABLE_IC_PROXY */
@@ -2177,8 +2169,6 @@ TeardownTCPInterconnect(ChunkTransportState * 
transportStates, bool hasErrors)
 
        if (transportStates->states != NULL)
                pfree(transportStates->states);
-       if (transportStates->conn_sent_record_typmod)
-               hash_destroy(transportStates->conn_sent_record_typmod);
 
        pfree(transportStates);
 
diff --git a/contrib/interconnect/udp/ic_udpifc.c 
b/contrib/interconnect/udp/ic_udpifc.c
index d2fcd8cb45..63e8c9301d 100644
--- a/contrib/interconnect/udp/ic_udpifc.c
+++ b/contrib/interconnect/udp/ic_udpifc.c
@@ -3066,7 +3066,6 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable)
 
        ChunkTransportStateEntry *sendingChunkTransportState = NULL;
        ChunkTransportState *interconnect_context;
-       HASHCTL conn_sent_record_typmod_ctl;
 
        pthread_mutex_lock(&ic_control_info.lock);
 
@@ -3089,10 +3088,6 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable)
                ic_control_info.ic_instance_id = sliceTable->ic_instance_id;
        }
 
-       conn_sent_record_typmod_ctl.keysize = sizeof(MotionConnKey);
-       conn_sent_record_typmod_ctl.entrysize = 
sizeof(MotionConnSentRecordTypmodEnt);
-       conn_sent_record_typmod_ctl.hcxt = CurrentMemoryContext;
-
        interconnect_context = palloc0(sizeof(ChunkTransportState));
 
        /* initialize state variables */
@@ -3106,8 +3101,6 @@ SetupUDPIFCInterconnect_Internal(SliceTable *sliceTable)
        interconnect_context->incompleteConns = NIL;
        interconnect_context->sliceTable = copyObject(sliceTable);
        interconnect_context->sliceId = sliceTable->localSlice;
-       interconnect_context->conn_sent_record_typmod = hash_create(
-        "MotionConn sent record typmod mapping", 128, 
&conn_sent_record_typmod_ctl, HASH_CONTEXT | HASH_ELEM | HASH_BLOBS);
 
        mySlice = 
&interconnect_context->sliceTable->slices[sliceTable->localSlice];
 
@@ -3746,8 +3739,6 @@ TeardownUDPIFCInterconnect_Internal(ChunkTransportState 
*transportStates,
                        pfree(transportStates->states);
                        transportStates->states = NULL;
                }
-               if (transportStates->conn_sent_record_typmod)
-                       hash_destroy(transportStates->conn_sent_record_typmod);
                pfree(transportStates);
        }
 
diff --git a/src/backend/cdb/motion/cdbmotion.c 
b/src/backend/cdb/motion/cdbmotion.c
index ff6955282a..24b112cd1b 100644
--- a/src/backend/cdb/motion/cdbmotion.c
+++ b/src/backend/cdb/motion/cdbmotion.c
@@ -359,27 +359,10 @@ CheckAndSendRecordCache(MotionLayerState *mlStates,
        MotionNodeEntry *pMNEntry;
        TupleChunkListData tcList;
        MemoryContext oldCtxt;
-       bool sent_record_typmod_found = false;
-       MotionConnKey motion_conn_key;
-       MotionConnSentRecordTypmodEnt *motion_conn_ent;
+       int32 *conn_sent_record_typmod;
 
-       /*
-        * for broadcast we only mark sent_record_typmod for connection 0 for
-        * efficiency and convenience
-        */
-       motion_conn_key.mot_node_id = motNodeID;
-       motion_conn_key.conn_index = targetRoute == BROADCAST_SEGIDX ? 0 : 
targetRoute;
-
-       motion_conn_ent = (MotionConnSentRecordTypmodEnt 
*)hash_search(transportStates->conn_sent_record_typmod, 
-                       &motion_conn_key, HASH_FIND, &sent_record_typmod_found);
-
-       if (!sent_record_typmod_found) {
-               ereport(ERROR,
-                               (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
-                                errmsg("interconnect error: Unexpected Motion 
Node Id: %d, targetRoute: %d",
-                                               motNodeID, targetRoute),
-                                errdetail("Fail to get sent_record_typmod from 
motion conntion")));
-       }
+       conn_sent_record_typmod = 
CurrentMotionIPCLayer->GetMotionSentRecordTypmod(transportStates, motNodeID, 
targetRoute);
+       Assert(conn_sent_record_typmod);
 
        /*
         * Analyze tools.  Do not send any thing if this slice is in the bit 
mask
@@ -394,7 +377,7 @@ CheckAndSendRecordCache(MotionLayerState *mlStates,
         */
        pMNEntry = getMotionNodeEntry(mlStates, motNodeID);
 
-       if (!ShouldSendRecordCache(motion_conn_ent->sent_record_typmod, 
&pMNEntry->ser_tup_info))
+       if (!ShouldSendRecordCache(*conn_sent_record_typmod, 
&pMNEntry->ser_tup_info))
                return;
 
 #ifdef AMS_VERBOSE_LOGGING
@@ -404,7 +387,7 @@ CheckAndSendRecordCache(MotionLayerState *mlStates,
        /* Create and store the serialized form, and some stats about it. */
        oldCtxt = MemoryContextSwitchTo(mlStates->motion_layer_mctx);
 
-       SerializeRecordCacheIntoChunks(&pMNEntry->ser_tup_info, &tcList, 
motion_conn_ent->sent_record_typmod);
+       SerializeRecordCacheIntoChunks(&pMNEntry->ser_tup_info, &tcList, 
*conn_sent_record_typmod);
 
        MemoryContextSwitchTo(oldCtxt);
 
@@ -432,7 +415,7 @@ CheckAndSendRecordCache(MotionLayerState *mlStates,
        /* cleanup */
        clearTCList(&pMNEntry->ser_tup_info.chunkCache, &tcList);
 
-       UpdateSentRecordCache(&motion_conn_ent->sent_record_typmod);
+       UpdateSentRecordCache(conn_sent_record_typmod);
 }
 
 /*
diff --git a/src/include/cdb/cdbinterconnect.h 
b/src/include/cdb/cdbinterconnect.h
index eda5c59463..9f5ce7753d 100644
--- a/src/include/cdb/cdbinterconnect.h
+++ b/src/include/cdb/cdbinterconnect.h
@@ -178,18 +178,6 @@ struct ChunkTransportStateEntry;
 typedef struct MotionConn MotionConn;
 typedef struct ChunkTransportStateEntry ChunkTransportStateEntry;
 
-typedef struct MotionConnKey 
-{
-       int mot_node_id;
-       int conn_index;
-} MotionConnKey;
-
-typedef struct MotionConnSentRecordTypmodEnt
-{
-       MotionConnKey key;
-       int32 sent_record_typmod;
-} MotionConnSentRecordTypmodEnt;
-
 typedef struct ChunkTransportState
 {
        /* array of per-motion-node chunk transport state
@@ -223,17 +211,6 @@ typedef struct ChunkTransportState
        /* Estate pointer for this statement */
        struct EState *estate;
 
-       /*
-        * used by the sender.
-        *
-        * the typmod of last sent record type in current connection,
-        * if the connection is for broadcasting then we only check
-        * and update this attribute on connection 0.
-        * 
-        * mapping the MotionConn -> int32
-        */
-       HTAB* conn_sent_record_typmod;
-
        /* ic_proxy backend context */
        struct ICProxyBackendContext *proxyContext;
 } ChunkTransportState;
diff --git a/src/include/cdb/ml_ipc.h b/src/include/cdb/ml_ipc.h
index a786a1c173..b39eaac60a 100644
--- a/src/include/cdb/ml_ipc.h
+++ b/src/include/cdb/ml_ipc.h
@@ -285,6 +285,10 @@ typedef struct MotionIPCLayer
                int16 motNodeID,
                int16 targetRoute);
 
+       int32 *(*GetMotionSentRecordTypmod)  (ChunkTransportState 
*transportStates,
+               int16 motNodeID,
+               int16 targetRoute);
+
 } MotionIPCLayer;
 
 /* MotionIPCLayer selected */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org

Reply via email to