This is an automated email from the ASF dual-hosted git repository.
maxyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push:
new acec354bf77 In resource-constrained environments, 10TB-scale
8-parallel processing in cloudberry may encounter specific anomalies related to
Motion layer UDP communication. Below are four key scenarios and how the code
modifications address them.
acec354bf77 is described below
commit acec354bf776892ecb11644142630ec9447a5f80
Author: Zhao Xi <[email protected]>
AuthorDate: Thu Nov 13 10:54:40 2025 +0800
In resource-constrained environments, 10TB-scale 8-parallel processing in
cloudberry may encounter specific anomalies related to Motion layer UDP
communication. Below are four key scenarios and how the code modifications
address them.
Four Anomaly Scenarios
1. Capacity Mismatch:
The receiving end’s buffer becomes full, but the sender is unaware. As a
result, the sender’s unacknowledged packet queue continues transmitting,
leading to unnecessary retransmissions and packet drops.
2. False Deadlock Detection:
The peer node processes heartbeat packets but fails to free up buffer
capacity. This triggers a false deadlock judgment, incorrectly flagging network
anomalies.
3. Unprocessed Packets Require Main Thread Wakeup:
When the receive queue is full, incoming data packets are discarded.
However, the main thread still needs to be awakened to process backlogged
packets in the queue, preventing permanent stalling.
4. Execution Time Mismatch Across Nodes:
Issues like data skew, computational performance gaps, or I/O
bottlenecks cause significant differences in execution time between nodes. For
example, in a hash join, if the inner table’s is not ready, the node cannot
process data from other nodes, leading to packet timeouts.
*Example Plan*: Packets from to (via ) timeout because the in
remains unready, blocking packet processing.
Code Modifications and Their Impact
The code changes target the above scenarios by enhancing UDP communication
feedback, adjusting deadlock checks, and ensuring proper thread wakeup. Key
modifications:
1. Addressing Capacity Mismatch:
- Added (256) to flag when the receive buffer is full.
- When the receive queue is full (), a response with is sent to the
sender (). This notifies the sender to pause or adjust transmission, preventing
blind retransmissions.
2. Fixing False Deadlock Detection:
- Modified to accept as a parameter, enabling ACK polling during
deadlock checks.
- Extended the initial timeout for deadlock suspicion from to 600
seconds, reducing premature network error reports.
- If no response is received after 600 seconds, the buffer capacity is
incrementally increased () to alleviate false bottlenecks, with detailed
logging before triggering an error.
3. Ensuring Main Thread Wakeup on Full Queue:
- In , even when packets are dropped due to a full queue, the main
thread is awakened () if the packet matches the waiting query/node/route. This
ensures backlogged packets in the queue are processed.
4. Mitigating Node Execution Mismatches:
- Added logging for retransmissions after attempts, providing
visibility into prolonged packet delays (e.g., due to unready ).
- Reset after successful ACK polling, preventing excessive retry counts
from triggering false timeouts.
---
contrib/interconnect/udp/ic_udpifc.c | 79 ++++++++++++++++++++++++++++++------
1 file changed, 67 insertions(+), 12 deletions(-)
diff --git a/contrib/interconnect/udp/ic_udpifc.c
b/contrib/interconnect/udp/ic_udpifc.c
index 548f71116c0..d11e4577cd6 100644
--- a/contrib/interconnect/udp/ic_udpifc.c
+++ b/contrib/interconnect/udp/ic_udpifc.c
@@ -209,6 +209,7 @@ int
#define UDPIC_FLAGS_DISORDER (32)
#define UDPIC_FLAGS_DUPLICATE (64)
#define UDPIC_FLAGS_CAPACITY (128)
+#define UDPIC_FLAGS_FULL (256)
/*
* ConnHtabBin
@@ -835,7 +836,7 @@ static void initUdpManager(mudp_manager_t mptr);
static inline void checkNetworkTimeout(ICBuffer *buf, uint64 now, bool
*networkTimeoutIsLogged);
static void checkExpiration(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pEntry, MotionConn *triggerConn, uint64 now);
-static void checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn
*conn);
+static void checkDeadlock(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pChunkEntry, MotionConn *conn);
static bool cacheFuturePacket(icpkthdr *pkt, struct sockaddr_storage *peer,
int peer_len);
static void cleanupStartupCache(void);
@@ -5220,6 +5221,12 @@ handleAcks(ChunkTransportState *transportStates,
ChunkTransportStateEntry *pChun
shouldSendBuffers |=
(handleAckForDisorderPkt(transportStates, &pEntry->entry, &ackConn->mConn,
pkt));
break;
}
+ else if (pkt->flags & UDPIC_FLAGS_FULL)
+ {
+ if (DEBUG1 >= log_min_messages)
+ write_log("Recv buff is full
[seq %d] from route %d; srcpid %d dstpid %d cmd %d flags 0x%x connseq %d",
pkt->seq, ackConn->route, pkt->srcPid, pkt->dstPid, pkt->icId, pkt->flags,
ackConn->conn_info.seq);
+ break;
+ }
/*
* don't get out of the loop if pkt->seq equals
to
@@ -6099,6 +6106,7 @@ checkExpiration(ChunkTransportState *transportStates,
if (pollAcks(transportStates,
pEntryUdp->txfd, wait_time))
{
handleAcks(transportStates, pEntry, false);
+ curBuf->nRetry = 0;
break;
}
@@ -6120,6 +6128,12 @@ checkExpiration(ChunkTransportState *transportStates,
};
}
+ if (loop_ack >
Gp_interconnect_min_retries_before_timeout / 5)
+ write_log("Resending packet (seq %d) to
%s (pid %d cid %d) with %d retries in %lu seconds",
+ curBuf->pkt->seq,
curBuf->conn->remoteHostAndPort,
+ curBuf->pkt->dstPid,
curBuf->pkt->dstContentId, curBuf->nRetry,
+ (now -
curBuf->sentTime) / 1000 / 1000);
+
currBuffConn = CONTAINER_OF(curBuf->conn,
MotionConnUDP, mConn);
retransmits++;
@@ -6214,7 +6228,7 @@ checkExpiration(ChunkTransportState *transportStates,
*
*/
static void
-checkDeadlock(ChunkTransportStateEntry *pChunkEntry, MotionConn *mConn)
+checkDeadlock(ChunkTransportState *transportStates, ChunkTransportStateEntry
*pChunkEntry, MotionConn *mConn)
{
uint64 deadlockCheckTime;
ChunkTransportStateEntryUDP *pEntry = NULL;
@@ -6251,17 +6265,31 @@ checkDeadlock(ChunkTransportStateEntry *pChunkEntry,
MotionConn *mConn)
ic_control_info.lastDeadlockCheckTime = now;
ic_statistics.statusQueryMsgNum++;
+ if (Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE && pollAcks(transportStates, pEntry->txfd,
50))
+ {
+ handleAcks(transportStates, pChunkEntry, false);
+ conn->deadlockCheckBeginTime = now;
+ }
+
/* check network error. */
- if ((now - conn->deadlockCheckBeginTime) > ((uint64)
Gp_interconnect_transmit_timeout * 1000 * 1000))
+ if ((now - conn->deadlockCheckBeginTime) > ((uint64)
Gp_interconnect_transmit_timeout * 100 * 1000))
{
- ereport(ERROR,
-
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
- errmsg("interconnect
encountered a network error, please check your network"),
- errdetail("Did not get any
response from %s (pid %d cid %d) in %d seconds.",
-
conn->mConn.remoteHostAndPort,
-
conn->conn_info.dstPid,
-
conn->conn_info.dstContentId,
-
Gp_interconnect_transmit_timeout)));
+ write_log("Did not get any response from %s
(pid %d cid %d) in 600 seconds.",conn->mConn.remoteHostAndPort,
+
conn->conn_info.dstPid,
+
conn->conn_info.dstContentId);
+
+ if (Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_TIMER)
+ conn->capacity += 1;
+
+ if ((now - conn->deadlockCheckBeginTime) >
((uint64) Gp_interconnect_transmit_timeout * 1000 * 1000))
+ ereport(ERROR,
+
(errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+ errmsg("interconnect
encountered a network error, please check your network"),
+ errdetail("Did not get
any response from %s (pid %d cid %d) in %d seconds.",
+
conn->mConn.remoteHostAndPort,
+
conn->conn_info.dstPid,
+
conn->conn_info.dstContentId,
+
Gp_interconnect_transmit_timeout)));
}
}
}
@@ -6393,7 +6421,7 @@ checkExceptions(ChunkTransportState *transportStates,
if ((retry & 0x3) == 2)
{
- checkDeadlock(pEntry, conn);
+ checkDeadlock(transportStates, pEntry, conn);
checkRxThreadError();
ML_CHECK_FOR_INTERRUPTS(transportStates->teardownActive);
}
@@ -6543,6 +6571,9 @@ SendChunkUDPIFC(ChunkTransportState *transportStates,
}
checkExceptions(transportStates, &pEntry->entry, &conn->mConn,
retry++, timeout);
doCheckExpiration = false;
+
+ if (!doCheckExpiration && icBufferListLength(&conn->unackQueue)
== 0 && conn->capacity > 0 && icBufferListLength(&conn->sndQueue) > 0)
+ sendBuffers(transportStates, &pEntry->entry,
&conn->mConn);
}
conn->mConn.pBuff = (uint8 *) conn->curBuff->pkt;
@@ -7136,6 +7167,30 @@ handleDataPacket(MotionConn *mConn, icpkthdr *pkt,
struct sockaddr_storage *peer
logPkt("Interconnect error: received a packet when the queue is
full ", pkt);
ic_statistics.disorderedPktNum++;
conn->stat_count_dropped++;
+
+ if (Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_TIMER && rx_control_info.mainWaitingState.waiting &&
+ rx_control_info.mainWaitingState.waitingNode ==
pkt->motNodeId &&
+ rx_control_info.mainWaitingState.waitingQuery
== pkt->icId)
+ {
+ if
(rx_control_info.mainWaitingState.waitingRoute == ANY_ROUTE)
+ {
+ if
(rx_control_info.mainWaitingState.reachRoute == ANY_ROUTE)
+
rx_control_info.mainWaitingState.reachRoute = conn->route;
+ }
+ else if
(rx_control_info.mainWaitingState.waitingRoute == conn->route)
+ {
+ if (DEBUG2 >= log_min_messages)
+ write_log("rx thread:
main_waiting waking it route %d",
rx_control_info.mainWaitingState.waitingRoute);
+
rx_control_info.mainWaitingState.reachRoute = conn->route;
+ }
+ /* WAKE MAIN THREAD HERE */
+ *wakeup_mainthread = true;
+ }
+
+ if (Gp_interconnect_fc_method ==
INTERCONNECT_FC_METHOD_LOSS_ADVANCE)
+ {
+ setAckSendParam(param, &conn->mConn, UDPIC_FLAGS_FULL,
conn->conn_info.seq - 1, conn->conn_info.extraSeq);
+ }
return false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]