oracleloyall commented on code in PR #1365:
URL: https://github.com/apache/cloudberry/pull/1365#discussion_r2367163521


##########
contrib/interconnect/udp/ic_udpifc.c:
##########
@@ -924,6 +993,353 @@ dumpTransProtoStats()
 
 #endif                                                 /* 
TRANSFER_PROTOCOL_STATS */
 
+static struct rto_hashstore*
+initRTOHashstore()
+{
+       int i;
+       struct rto_hashstore* hs = palloc(sizeof(struct rto_hashstore));
+       if (!hs)
+               return 0;
+
+       for (i = 0; i < RTO_HASH; i++)
+               TAILQ_INIT(&hs->rto_list[i]);
+
+       TAILQ_INIT(&hs->rto_list[RTO_HASH]);
+
+       return hs;
+}
+
+static void
+initUdpManager(mudp_manager_t mudp)
+{
+       mudp->rto_store = initRTOHashstore();
+       mudp->rto_list_cnt = 0;
+       mudp->cur_ts = 0;
+}
+
+static inline void
+addtoRTOList(mudp_manager_t mudp, MotionConnUDP *cur_stream)
+{
+       if (!mudp->rto_list_cnt)
+       {
+               mudp->rto_store->rto_now_idx = 0;
+               mudp->rto_store->rto_now_ts = cur_stream->sndvar.ts_rto;
+       }
+
+       if (cur_stream->on_rto_idx < 0 )
+       {
+               if (cur_stream->on_timewait_list)
+                       return;
+
+               int diff = (int32_t)(cur_stream->sndvar.ts_rto - 
mudp->rto_store->rto_now_ts);
+               if (diff < RTO_HASH)
+               {
+                       int offset= (diff + mudp->rto_store->rto_now_idx) % 
RTO_HASH;
+                       cur_stream->on_rto_idx = offset;
+                       TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[offset]),
+                                       cur_stream, sndvar.timer_link);
+               }
+               else
+               {
+                       cur_stream->on_rto_idx = RTO_HASH;
+                       
TAILQ_INSERT_TAIL(&(mudp->rto_store->rto_list[RTO_HASH]),
+                                       cur_stream, sndvar.timer_link);
+               }
+               mudp->rto_list_cnt++;
+       }
+}
+
+static inline void
+removeFromRTOList(mudp_manager_t mudp,
+                               MotionConnUDP *cur_stream)
+{
+       if (cur_stream->on_rto_idx < 0)
+               return;
+
+       TAILQ_REMOVE(&mudp->rto_store->rto_list[cur_stream->on_rto_idx],
+                       cur_stream, sndvar.timer_link);
+       cur_stream->on_rto_idx = -1;
+
+       mudp->rto_list_cnt--;
+}
+
+static inline void
+updateRetransmissionTimer(mudp_manager_t mudp,
+                               MotionConnUDP *cur_stream,
+                               uint32_t cur_ts)
+{
+       cur_stream->sndvar.nrtx = 0;
+
+       /* if in rto list, remove it */
+       if (cur_stream->on_rto_idx >= 0)
+               removeFromRTOList(mudp, cur_stream);
+
+       /* Reset retransmission timeout */
+       if (UDP_SEQ_GT(cur_stream->snd_nxt, cur_stream->sndvar.snd_una))
+       {
+               /* there are packets sent but not acked */
+               /* update rto timestamp */
+               cur_stream->sndvar.ts_rto = cur_ts + cur_stream->sndvar.rto;
+               addtoRTOList(mudp, cur_stream);
+
+       }
+
+       if (cur_stream->on_rto_idx == -1)
+       {
+               cur_stream->sndvar.ts_rto = cur_ts + cur_stream->sndvar.rto;
+               addtoRTOList(mudp, cur_stream);
+       }
+}
+
+static int 
+handleRTO(mudp_manager_t mudp,
+                               uint32_t cur_ts,
+                               MotionConnUDP *cur_stream,
+                               ChunkTransportState *transportStates,
+                               ChunkTransportStateEntry *pEntry,
+                               MotionConn *triggerConn)
+{
+       /* check for expiration */
+       int                     count = 0;
+       int                     retransmits = 0;
+       MotionConnUDP *currBuffConn = NULL;
+       uint32_t now = cur_ts;
+
+       Assert(unack_queue_ring.currentTime != 0);
+       removeFromRTOList(mudp, cur_stream);
+
+       while (now >= (unack_queue_ring.currentTime + TIMER_SPAN) && count++ < 
UNACK_QUEUE_RING_SLOTS_NUM)
+       {
+               /* expired, need to resend them */
+               ICBuffer   *curBuf = NULL;
+
+               while ((curBuf = 
icBufferListPop(&unack_queue_ring.slots[unack_queue_ring.idx])) != NULL)
+               {
+                       curBuf->nRetry++;
+                       putIntoUnackQueueRing(
+                                                               
&unack_queue_ring,
+                                                               curBuf,
+                                                               
computeExpirationPeriod(curBuf->conn, curBuf->nRetry), now);
+
+#ifdef TRANSFER_PROTOCOL_STATS
+                       updateStats(TPE_DATA_PKT_SEND, curBuf->conn, 
curBuf->pkt);
+#endif
+
+                       sendOnce(transportStates, pEntry, curBuf, curBuf->conn);
+
+                       currBuffConn = CONTAINER_OF(curBuf->conn, 
MotionConnUDP, mConn);
+
+                       retransmits++;
+                       ic_statistics.retransmits++;
+                       currBuffConn->stat_count_resent++;
+                       currBuffConn->stat_max_resent = 
Max(currBuffConn->stat_max_resent, currBuffConn->stat_count_resent);
+                       checkNetworkTimeout(curBuf, now, 
&transportStates->networkTimeoutIsLogged);
+
+#ifdef AMS_VERBOSE_LOGGING
+                       write_log("RESEND pkt with seq %d (retry %d, rtt " 
UINT64_FORMAT ") to route %d",
+                                 curBuf->pkt->seq, curBuf->nRetry, 
curBuf->conn->rtt, curBuf->conn->route);
+                       logPkt("RESEND PKT in checkExpiration", curBuf->pkt);
+#endif
+               }
+
+               unack_queue_ring.currentTime += TIMER_SPAN;
+               unack_queue_ring.idx = (unack_queue_ring.idx + 1) % 
(UNACK_QUEUE_RING_SLOTS_NUM);
+       }
+       return 0;
+}
+
+static inline void
+rearrangeRTOStore(mudp_manager_t mudp)
+{
+       MotionConnUDP *walk, *next;
+       struct rto_head* rto_list = &mudp->rto_store->rto_list[RTO_HASH];
+       int cnt = 0;
+
+       for (walk = TAILQ_FIRST(rto_list); walk != NULL; walk = next)
+       {
+               next = TAILQ_NEXT(walk, sndvar.timer_link);
+
+               int diff = (int32_t)(mudp->rto_store->rto_now_ts - 
walk->sndvar.ts_rto);

Review Comment:
   enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cloudberry.apache.org
For additional commands, e-mail: commits-h...@cloudberry.apache.org

Reply via email to