This is an automated email from the ASF dual-hosted git repository.

maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git


The following commit(s) were added to refs/heads/main by this push:
     new acec354bf77 In resource-constrained environments, 10TB-scale 
8-parallel processing in cloudberry may encounter specific anomalies related to 
Motion layer UDP communication. Below are four key scenarios and how the code 
modifications address them.
acec354bf77 is described below

commit acec354bf776892ecb11644142630ec9447a5f80
Author: Zhao Xi <[email protected]>
AuthorDate: Thu Nov 13 10:54:40 2025 +0800

    In resource-constrained environments, 10TB-scale 8-parallel processing in 
cloudberry may encounter specific anomalies related to Motion layer UDP 
communication. Below are four key scenarios and how the code modifications 
address them.
    
    Four Anomaly Scenarios
    
    1. Capacity Mismatch:
       The receiving end’s buffer becomes full, but the sender is unaware. As a 
result, the sender’s unacknowledged packet queue continues transmitting, 
leading to unnecessary retransmissions and packet drops.
    
    2. False Deadlock Detection:
       The peer node processes heartbeat packets but fails to free up buffer 
capacity. This triggers a false deadlock judgment, incorrectly flagging network 
anomalies.
    
    3. Unprocessed Packets Require Main Thread Wakeup:
       When the receive queue is full, incoming data packets are discarded. 
However, the main thread still needs to be awakened to process backlogged 
packets in the queue, preventing permanent stalling.
    
    4. Execution Time Mismatch Across Nodes:
       Issues like data skew, computational performance gaps, or I/O 
bottlenecks cause significant differences in execution time between nodes. For 
example, in a hash join, if the inner table’s  is not ready, the node cannot 
process data from other nodes, leading to packet timeouts.
       *Example Plan*: Packets from  to  (via ) timeout because the  in  
remains unready, blocking packet processing.
    
    Code Modifications and Their Impact
    
    The code changes target the above scenarios by enhancing UDP communication 
feedback, adjusting deadlock checks, and ensuring proper thread wakeup. Key 
modifications:
    
    1. Addressing Capacity Mismatch:
       - Added  (256) to flag when the receive buffer is full.
       - When the receive queue is full (), a response with  is sent to the 
sender (). This notifies the sender to pause or adjust transmission, preventing 
blind retransmissions.
    
    2. Fixing False Deadlock Detection:
       - Modified  to accept  as a parameter, enabling ACK polling during 
deadlock checks.
       - Extended the initial timeout for deadlock suspicion from  to 600 
seconds, reducing premature network error reports.
       - If no response is received after 600 seconds, the buffer capacity is 
incrementally increased () to alleviate false bottlenecks, with detailed 
logging before triggering an error.
    
    3. Ensuring Main Thread Wakeup on Full Queue:
       - In , even when packets are dropped due to a full queue, the main 
thread is awakened () if the packet matches the waiting query/node/route. This 
ensures backlogged packets in the queue are processed.
    
    4. Mitigating Node Execution Mismatches:
       - Added logging for retransmissions after  attempts, providing 
visibility into prolonged packet delays (e.g., due to unready ).
       - Reset  after successful ACK polling, preventing excessive retry counts 
from triggering false timeouts.
---
 contrib/interconnect/udp/ic_udpifc.c | 79 ++++++++++++++++++++++++++++++------
 1 file changed, 67 insertions(+), 12 deletions(-)

diff --git a/contrib/interconnect/udp/ic_udpifc.c 
b/contrib/interconnect/udp/ic_udpifc.c
index 548f71116c0..d11e4577cd6 100644
--- a/contrib/interconnect/udp/ic_udpifc.c
+++ b/contrib/interconnect/udp/ic_udpifc.c
@@ -209,6 +209,7 @@ int
 #define UDPIC_FLAGS_DISORDER                   (32)
 #define UDPIC_FLAGS_DUPLICATE                  (64)
 #define UDPIC_FLAGS_CAPACITY                   (128)
+#define UDPIC_FLAGS_FULL                       (256)
 
 /*
  * ConnHtabBin
@@ -835,7 +836,7 @@ static void initUdpManager(mudp_manager_t mptr);
 static inline void checkNetworkTimeout(ICBuffer *buf, uint64 now, bool 
*networkTimeoutIsLogged);
 
 static void checkExpiration(ChunkTransportState *transportStates, 
ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now);
-static void checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn 
*conn);
+static void checkDeadlock(ChunkTransportState *transportStates, 
ChunkTransportStateEntry *pChunkEntry, MotionConn *conn);
 
 static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer, 
int peer_len);
 static void cleanupStartupCache(void);
@@ -5220,6 +5221,12 @@ handleAcks(ChunkTransportState *transportStates, 
ChunkTransportStateEntry *pChun
                                        shouldSendBuffers |= 
(handleAckForDisorderPkt(transportStates, &pEntry->entry, &ackConn->mConn, 
pkt));
                                        break;
                                }
+                               else if (pkt->flags & UDPIC_FLAGS_FULL)
+                               {
+                                       if (DEBUG1 >= log_min_messages)
+                                               write_log("Recv buff is full 
[seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d", 
pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags, 
ackConn->conn_info.seq);
+                                       break;
+                               }
 
                                /*
                                 * don't get out of the loop if pkt->seq equals 
to
@@ -6099,6 +6106,7 @@ checkExpiration(ChunkTransportState *transportStates,
                                                if (pollAcks(transportStates, 
pEntryUdp->txfd, wait_time))
                                                {
                                                        
handleAcks(transportStates, pEntry, false);
+                                                       curBuf->nRetry = 0;
                                                        break;
                                                }
 
@@ -6120,6 +6128,12 @@ checkExpiration(ChunkTransportState *transportStates,
                                        };
                                }
 
+                               if (loop_ack > 
Gp_interconnect_min_retries_before_timeout / 5)
+                                       write_log("Resending packet (seq %d) to 
%s (pid %d cid %d) with %d retries in %lu seconds",
+                                                       curBuf->pkt->seq, 
curBuf->conn->remoteHostAndPort,
+                                                       curBuf->pkt->dstPid, 
curBuf->pkt->dstContentId, curBuf->nRetry,
+                                                       (now - 
curBuf->sentTime) / 1000 / 1000);
+
                                currBuffConn = CONTAINER_OF(curBuf->conn, 
MotionConnUDP, mConn);
 
                                retransmits++;
@@ -6214,7 +6228,7 @@ checkExpiration(ChunkTransportState *transportStates,
  *
  */
 static void
-checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn)
+checkDeadlock(ChunkTransportState *transportStates, ChunkTransportStateEntry 
*pChunkEntry, MotionConn *mConn)
 {
        uint64          deadlockCheckTime;
        ChunkTransportStateEntryUDP *pEntry = NULL;
@@ -6251,17 +6265,31 @@ checkDeadlock(ChunkTransportStateEntry *pChunkEntry, 
MotionConn *mConn)
                        ic_control_info.lastDeadlockCheckTime = now;
                        ic_statistics.statusQueryMsgNum++;
 
+                       if (Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE && pollAcks(transportStates, pEntry->txfd, 
50))
+                       {
+                               handleAcks(transportStates, pChunkEntry, false);
+                               conn->deadlockCheckBeginTime = now;
+                       }
+
                        /* check network error. */
-                       if ((now - conn->deadlockCheckBeginTime) > ((uint64) 
Gp_interconnect_transmit_timeout * 1000 * 1000))
+                       if ((now - conn->deadlockCheckBeginTime) > ((uint64) 
Gp_interconnect_transmit_timeout * 100 * 1000))
                        {
-                               ereport(ERROR,
-                                               
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
-                                                errmsg("interconnect 
encountered a network error, please check your network"),
-                                                errdetail("Did not get any 
response from %s (pid %d cid %d) in %d seconds.",
-                                                                  
conn->mConn.remoteHostAndPort,
-                                                                  
conn->conn_info.dstPid,
-                                                                  
conn->conn_info.dstContentId,
-                                                                  
Gp_interconnect_transmit_timeout)));
+                               write_log("Did not get any response from %s 
(pid %d cid %d) in 600 seconds.",conn->mConn.remoteHostAndPort,
+                                                                         
conn->conn_info.dstPid,
+                                                                         
conn->conn_info.dstContentId);
+
+                               if (Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_TIMER)
+                                       conn->capacity += 1;
+
+                               if ((now - conn->deadlockCheckBeginTime) > 
((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000))
+                                       ereport(ERROR,
+                                                       
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+                                                       errmsg("interconnect 
encountered a network error, please check your network"),
+                                                       errdetail("Did not get 
any response from %s (pid %d cid %d) in %d seconds.",
+                                                                       
conn->mConn.remoteHostAndPort,
+                                                                       
conn->conn_info.dstPid,
+                                                                       
conn->conn_info.dstContentId,
+                                                                       
Gp_interconnect_transmit_timeout)));
                        }
                }
        }
@@ -6393,7 +6421,7 @@ checkExceptions(ChunkTransportState *transportStates,
 
        if ((retry & 0x3) == 2)
        {
-               checkDeadlock(pEntry, conn);
+               checkDeadlock(transportStates, pEntry, conn);
                checkRxThreadError();
                ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
        }
@@ -6543,6 +6571,9 @@ SendChunkUDPIFC(ChunkTransportState *transportStates,
                }
                checkExceptions(transportStates, &pEntry->entry, &conn->mConn, 
retry++, timeout);
                doCheckExpiration = false;
+
+               if (!doCheckExpiration && icBufferListLength(&conn->unackQueue) 
== 0 && conn->capacity > 0 && icBufferListLength(&conn->sndQueue) > 0)
+                       sendBuffers(transportStates, &pEntry->entry, 
&conn->mConn);
        }
 
        conn->mConn.pBuff = (uint8 *) conn->curBuff->pkt;
@@ -7136,6 +7167,30 @@ handleDataPacket(MotionConn *mConn, icpkthdr *pkt, 
struct sockaddr_storage *peer
                logPkt("Interconnect error: received a packet when the queue is 
full ", pkt);
                ic_statistics.disorderedPktNum++;
                conn->stat_count_dropped++;
+
+               if (Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_TIMER && rx_control_info.mainWaitingState.waiting &&
+                               rx_control_info.mainWaitingState.waitingNode == 
pkt->motNodeId &&
+                               rx_control_info.mainWaitingState.waitingQuery 
== pkt->icId)
+               {
+                               if 
(rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE)
+                               {
+                                       if 
(rx_control_info.mainWaitingState.reachRoute == ANY_ROUTE)
+                                               
rx_control_info.mainWaitingState.reachRoute = conn->route;
+                               }
+                               else if 
(rx_control_info.mainWaitingState.waitingRoute == conn->route)
+                               {
+                                       if (DEBUG2 >= log_min_messages)
+                                               write_log("rx thread: 
main_waiting waking it route %d", 
rx_control_info.mainWaitingState.waitingRoute);
+                                       
rx_control_info.mainWaitingState.reachRoute = conn->route;
+                               }
+                               /* WAKE MAIN THREAD HERE */
+                               *wakeup_mainthread = true;
+               }
+
+               if (Gp_interconnect_fc_method == 
INTERCONNECT_FC_METHOD_LOSS_ADVANCE)
+               {
+                       setAckSendParam(param, &conn->mConn, UDPIC_FLAGS_FULL, 
conn->conn_info.seq - 1, conn->conn_info.extraSeq);
+               }
                return false;
        }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to