yanglimingcn commented on code in PR #3290:
URL: https://github.com/apache/brpc/pull/3290#discussion_r3258568951


##########
src/brpc/ubshm/ub_ring.cpp:
##########
@@ -0,0 +1,1083 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <iostream>
+#include <gflags/gflags.h>
+#include <unistd.h>
+#include <ctime>
+#include "bthread/bthread.h"
+#include "butil/logging.h"
+#include "brpc/ubshm/ub_ring.h"
+#include "brpc/ubshm/ub_ring_manager.h"
+#include "brpc/ubshm/shm/shm_ipc.h"
+
+namespace brpc {
+namespace ubring {
+uint32_t g_sleepTime[UBR_TASK_STEP_NUM] = {0};
+#define TIME_COVERSION 1000
+DEFINE_int32(ub_disconnect_timeout, 5, "Ubshm disconnection timeout.");
+DEFINE_int32(ub_connect_timeout, 1, "Ubshm connection timeout.");
+DEFINE_int32(ub_hb_timer_interval, 5, "Heartbeat timer interval.");
+DEFINE_int32(ub_hb_retry_cnt, 10, "Heartbeat retry times.");
+DEFINE_int32(ub_event_queue_timer_interval, 100, "Interval of the 
disconnection timer.");
+
+UBRing::UBRing()
+{}
+UBRing::~UBRing()
+{}
+
+RETURN_CODE UBRing::UbrTrxMapShm(SHM *localShm, SHM *remoteShm)
+{
+    RETURN_CODE rc = UbrTrxMapLocalShm(localShm);
+    if (UNLIKELY(rc != UBRING_OK)) {
+        LOG(ERROR) << "Trx map local shared memory failed.";
+        return rc;
+    }
+    rc = UbrTrxMapRemoteShm(remoteShm);
+    if (UNLIKELY(rc != UBRING_OK)) {
+        LOG(ERROR) << "Trx map remote shared memory failed.";
+        return rc;
+    }
+    return UBRING_OK;
+}
+
+RETURN_CODE UBRing::UbrTrxClose() {
+    RETURN_CODE closeCheckRc = UbrTrxCloseCheck(_trx);
+    if (UNLIKELY(closeCheckRc != UBRING_OK)) {
+        if (closeCheckRc == UBRING_REENTRY) {
+            LOG(INFO) << "Trx close skipped, already closing, local name=" << 
_trx->localShm.name;
+            return UBRING_OK;
+        }
+        return UBRING_ERR;
+    }
+    if (_trx->ubrRx.remoteTxEventQ.addr != nullptr) {
+        ((UbrEventQMsg *)_trx->ubrRx.remoteTxEventQ.addr)->flag = 
UBR_STATE_CLOSING;
+    }
+
+    uint32_t disconnectTimeout = FLAGS_ub_disconnect_timeout;
+    uint64_t startTime = GetCurNanoSeconds();
+
+    if (_trx->ubrTx.localTxEventQ.addr != nullptr && ((UbrEventQMsg 
*)_trx->ubrTx.localTxEventQ.addr)->flag == UBR_STATE_CONNECTED) {
+        ((UbrEventQMsg *)_trx->ubrTx.localTxEventQ.addr)->flag = 
UBR_STATE_CLOSED;
+        _trx->ubrTx.trxState = UBR_STATE_CLOSED;
+    }
+
+    if (_trx->ubrTx.remoteRxEventQ.addr != nullptr) {
+        ((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->flag = 
UBR_STATE_CLOSED;
+    }
+    while (_trx->ubrRx.localRxEventQ.addr != nullptr && ((UbrEventQMsg 
*)_trx->ubrRx.localRxEventQ.addr)->flag != UBR_STATE_CLOSED) {
+        UbrSetSleepTask(UBR_TASK_CLOSE);
+        if (HasTimedOut(startTime, disconnectTimeout) != UBRING_OK) {
+            LOG(WARNING) << "Local shm " << _trx->localShm.name
+            << " wait for the peer to close timed out, force cleanup.";
+            _trx->ubrRx.trxState = UBR_STATE_CLOSED;
+            // Force synchronous cleanup instead of relying on async timer
+            DeleteTimerSafe((uint32_t)_trx->timerFd);
+            DeleteTimerSafe((uint32_t)_trx->hbTimerFd);
+            if (_trx->ubrTx.remoteRxEventQ.addr != nullptr) {
+                ((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->flag = 
UBR_STATE_CLOSED;
+            }
+            if (UNLIKELY(ShmRemoteFree(&_trx->remoteShm) != UBRING_OK)) {
+                LOG(WARNING) << "Force close, remote shm " << 
_trx->remoteShm.name << " free failed.";
+            }
+            if (UNLIKELY(UbrTrxFreeShm(_trx) != UBRING_OK)) {
+                LOG(WARNING) << "Force close, local shm " << 
_trx->localShm.name << " free failed.";
+            }
+            if (UNLIKELY(UBRingManager::ReleaseUbrTrxFromMgr(_trx) != 
UBRING_OK)) {
+                LOG(WARNING) << "Force close, release trx " << 
_trx->localShm.name << " failed.";
+            }
+            return UBRING_ERR_TIMEOUT;
+        }
+        bthread_usleep(1000);  // 1ms, yield to other bthreads
+    }
+    _trx->ubrRx.trxState = UBR_STATE_CLOSED;
+    RETURN_CODE rc;
+    if (UNLIKELY((rc = ClearTrxResource(_trx, startTime, UBR_SEND_CLOSE)) != 
UBRING_OK)) {
+        if (rc == UBRING_REENTRY) {
+            LOG(INFO) << "Trx close, peer is closing, trx local name=" << 
_trx->localShm.name;
+            return UBRING_OK;
+        }
+        LOG(ERROR) << "Trx close, clear trx resource failed, trx local name=" 
<< _trx->localShm.name;
+        return UBRING_ERR;
+    }
+    // Unlink local shm name immediately so process exit does not leave 
visible leftovers.
+    RETURN_CODE unlinkRc = ShmFree(&_trx->localShm);
+    if (unlinkRc != UBRING_OK && unlinkRc != SHM_ERR_NOT_FOUND && unlinkRc != 
SHM_ERR_RESOURCE_ATTACHED) {
+        LOG(WARNING) << "Trx close, unlink local shm failed, trx local name=" 
<< _trx->localShm.name
+                     << ", rc=" << unlinkRc;
+    }
+    return UBRING_OK;
+}
+
+RETURN_CODE UBRing::UbrAddCloseTimer() {
+    if (UNLIKELY(_trx == NULL)) {
+        LOG(ERROR) << "Trx add close timer failed, trx is null.";
+        return UBRING_ERR;
+    }
+
+    uint32_t eventQTimerInterval = FLAGS_ub_event_queue_timer_interval * 
TIME_COVERSION;
+    itimerspec timeSpec = {
+            .it_interval = {.tv_sec = 0, .tv_nsec = eventQTimerInterval},
+            .it_value = {.tv_sec = 0, .tv_nsec = 1}
+    };
+    int timerFd = TimerStart(&timeSpec, UbrTrxCloseCallback, (void*)_trx);
+    if (UNLIKELY(timerFd == -1)) {
+        LOG(ERROR) << "Start ubr close timer failed, trx local name=" << 
_trx->localShm.name;
+        return UBRING_ERR;
+    }
+    _trx->timerFd = timerFd;
+    return UBRING_OK;
+}
+
+RETURN_CODE UBRing::UbrAddTimer() {
+    if (UNLIKELY(UbrAddCloseTimer() != UBRING_OK)) {
+        LOG(ERROR) << "Ubr " << _trx->localShm.name << " add closed timer 
failed.";
+        return UBRING_ERR;
+    }
+
+    if (UNLIKELY(UbrAddHBTimer() != UBRING_OK)) {
+        DeleteTimerSafe((uint32_t)_trx->timerFd);
+        LOG(ERROR) << "Ubr " << _trx->localShm.name << " add heartbeat timer 
failed.";
+        return UBRING_ERR;
+    }
+    return UBRING_OK;
+}
+
+void* UBRing::UbrTrxCloseCallback(void* args) {
+    auto* trx = (UbrTrx*) args;
+    if (UNLIKELY(UBRing::UbrTrxCallbackCheck(trx) != UBRING_OK)) {
+        return nullptr;
+    }
+
+    auto* localRxEventQ = (UbrEventQMsg *)trx->ubrRx.localRxEventQ.addr;
+    auto* localTxEventQ = (UbrEventQMsg *)trx->ubrTx.localTxEventQ.addr;
+    if (localRxEventQ->flag != UBR_STATE_CLOSED || localTxEventQ->flag == 
UBR_STATE_CLOSED) {
+        return nullptr;
+    }
+    trx->ubrRx.trxState = UBR_STATE_CLOSED;
+    int fd = (int)trx->localShm.fd;
+    do {
+        if (ATOMIC_LOAD(trx->closeCnt) == 0) {
+            break;
+        }
+        ATOMIC_SUB(trx->closeCnt, 1);
+
+        uint64_t startTime = GetCurNanoSeconds();
+
+        if (localTxEventQ->flag == UBR_STATE_CONNECTED || 
ATOMIC_LOAD(trx->closeCnt) == 1) {
+            localTxEventQ->flag = UBR_STATE_CLOSED;
+            trx->ubrTx.trxState = UBR_STATE_CLOSED;
+        }
+        UbrEventQMsg* remoteRxEventQ = (UbrEventQMsg 
*)trx->ubrTx.remoteRxEventQ.addr;
+        if (remoteRxEventQ == nullptr) {
+            LOG(ERROR) << "Trx close callback failed, " << trx->localShm.name 
<< " remoteRxEventQ is NULL.";
+            break;
+        }
+        remoteRxEventQ->flag = UBR_STATE_CLOSED;
+        RETURN_CODE clearRc = ClearTrxResource(trx, startTime, 
UBR_CALL_BACK_CLOSE, 1);
+        if (UNLIKELY(clearRc != UBRING_OK && clearRc != UBRING_REENTRY)) {
+            LOG(ERROR) << "Trx close callback failed, " << trx->localShm.name 
<< " clear trx resource failed.";
+            break;
+        }
+    } while (0);
+    return nullptr;
+}
+
+RETURN_CODE UBRing::UbrAddHBTimer() {
+    if (UNLIKELY(_trx == NULL)) {
+        LOG(ERROR) << "Trx add heartbeat timer failed, trx is null.";
+        return UBRING_ERR;
+    }
+
+    itimerspec timeSpec = {
+            .it_interval = {.tv_sec = FLAGS_ub_hb_timer_interval, .tv_nsec = 
0},
+            .it_value = {.tv_sec = 0, .tv_nsec = 1}
+    };
+    int timerFd = TimerStart(&timeSpec, UbrTrxHBCallback, (void*)_trx);
+    if (UNLIKELY(timerFd == -1)) {
+        LOG(ERROR) << "Start ubr heartbeat timer failed.";
+        return UBRING_ERR;
+    }
+    _trx->hbTimerFd = timerFd;
+    return UBRING_OK;
+}
+
+RETURN_CODE UBRing::UbrPassiveClearTrx(UbrTrx *trx, int fd, PASSIVE_DISC_TYPE 
type) {
+    RETURN_CODE passiveCloseCheckRc = UbrTrxCloseCheck(trx);
+    if (UNLIKELY(passiveCloseCheckRc != UBRING_OK)) {
+        if (passiveCloseCheckRc == UBRING_REENTRY) {
+            LOG(INFO) << "Passive close skipped, active close in progress, 
name=" << trx->localShm.name;
+            uint64_t startTime = GetCurNanoSeconds();
+            return ClearTrxResource(trx, startTime, UBR_CALL_BACK_CLOSE);
+        }
+        return UBRING_ERR;
+    }
+    trx->ubrTx.trxState = UBR_STATE_CLOSED;
+    trx->ubrRx.trxState = UBR_STATE_CLOSED;
+    DeleteTimerSafe((uint32_t)trx->timerFd);
+    const char *typeName = NULL;
+    if (type == UBR_HEARTBEAT) {
+        DeleteTimer((uint32_t)trx->hbTimerFd);
+        typeName = "Trx heartbeat";
+    } else if (type == UBR_UB_EVENT) {
+        DeleteTimerSafe((uint32_t)trx->hbTimerFd);
+        typeName = "Ub event callback";
+    }
+    bthread_usleep(FLAGS_ub_flying_io_timeout * 1000000LL);  // yield-friendly 
sleep
+
+    int rc = ShmLocalFree(&trx->remoteShm);
+    if (rc != UBRING_OK) {
+        LOG(ERROR) << typeName << ", delete remote shm failed. ret=" << rc;
+    }
+    rc = ShmLocalFree(&trx->localShm);
+    if (rc != UBRING_OK) {
+        LOG(ERROR) << typeName << ", delete local shm failed. ret=" << rc;
+    }
+
+    UBRingManager::ReleaseUbrTrxFromMgr(trx);
+    return UBRING_OK;
+}
+
+void* UBRing::UbrTrxHBCallback(void* args) {
+    auto* trx = (UbrTrx*) args;
+    if (UNLIKELY(UbrTrxCallbackCheck(trx) != UBRING_OK)) {
+        return NULL;
+    }
+
+    auto* localDataStatus = (UbrDataStatusQMsg 
*)trx->ubrTx.localDataStatusQ.addr;
+    auto* remoteDataStatus = (UbrDataStatusQMsg 
*)trx->ubrRx.remoteDataStatusQ.addr;
+    if (UNLIKELY(localDataStatus == NULL || remoteDataStatus == NULL)) {
+        LOG(ERROR) << "Heartbeat error, datastatus is NULL.";
+        return NULL;
+    }
+
+    if (trx->ubrTx.trxState != UBR_STATE_CONNECTED || trx->ubrRx.trxState != 
UBR_STATE_CONNECTED) {
+        LOG_EVERY_SECOND(INFO) << "Heartbeat cannot be started, wait connected 
state.";
+        return NULL;
+    }
+
+    remoteDataStatus->heartBeat = 1;
+    if (localDataStatus->heartBeat == 1) {
+        localDataStatus->heartBeat = 0;
+        trx->ubrTx.hbRetryCnt = 0;
+        return NULL;
+    }
+
+    ++trx->ubrTx.hbRetryCnt;
+    if (trx->ubrTx.hbRetryCnt <= FLAGS_ub_hb_retry_cnt) {
+        return NULL;
+    }
+
+    int fd = (int)trx->localShm.fd;
+    LOG(INFO) << "Hlc heartbeat, start to clear trx resource. hbTimerFd=" << 
fd << ", shmName=" << trx->localShm.name;
+    UbrPassiveClearTrx(trx, fd, UBR_HEARTBEAT);
+    LOG(INFO) << "Hlc heartbeat clear trx resource finish.";
+    return NULL;
+}
+
+RETURN_CODE UBRing::UbrAddAsynClearTimer(UbrTrx *trx) {
+    if (UNLIKELY(trx == NULL)) {
+        LOG(ERROR) << "Trx add close timer failed, trx is null.";
+        return UBRING_ERR;
+    }
+
+    if (trx->clearTimerFd > 0) {
+        return UBRING_OK;
+    }
+
+    itimerspec timeSpec = {
+            .it_interval = {.tv_sec = 0, .tv_nsec = 0},
+            .it_value = {.tv_sec = FLAGS_ub_flying_io_timeout, .tv_nsec = 0}
+    };
+
+    int timerFd = TimerStart(&timeSpec, UbrAsynClearCallback, (void*)trx);
+    if (UNLIKELY(timerFd == -1)) {
+        LOG(ERROR) << "Start ubr close timer failed, trx name=%s.", 
trx->localShm.name;
+        return UBRING_ERR;
+    }
+    trx->clearTimerFd = timerFd;
+    return UBRING_OK;
+}
+
+void *UBRing::UbrAsynClearCallback(void *args)
+{
+    auto* trx = (UbrTrx*) args;
+    if (UNLIKELY(trx == NULL)) {
+        LOG(ERROR) << "Trx close, trx is null.";
+        return NULL;
+    }
+
+    if (UNLIKELY(ShmRemoteFree(&trx->remoteShm) != UBRING_OK)) {
+        LOG(ERROR) << "Trx close, remote shm " << trx->remoteShm.name << " 
free failed.";
+    }
+
+    if (UNLIKELY(UbrTrxFreeShm(trx) != UBRING_OK)) {
+        LOG(ERROR) << "Trx close, wait for local shm " << trx->localShm.name 
<< " free fail.";
+    }
+
+    if (UNLIKELY(UBRingManager::ReleaseUbrTrxFromMgr(trx) != UBRING_OK)) {
+        LOG(ERROR) << "Trx close, release shm " << trx->localShm.name << " trx 
failed.";
+    }
+    return NULL;
+}
+
+int UBRing::UbrTrxSend(const void *buf, uint32_t bufLen)
+{
+    if (UNLIKELY(CheckTrxSendPreCheck(_trx) != UBRING_OK)) {
+        return UBRING_ERR;
+    }
+    // 1.2 计算空间
+    auto *dataStatusMsg = (UbrDataStatusQMsg 
*)_trx->ubrTx.localDataStatusQ.addr;
+    auto *dataMsg = (UbrMsgFormat *)_trx->ubrTx.remoteDataQ.addr;
+    uint32_t cap = _trx->ubrTx.capacity;
+    uint32_t tail = dataStatusMsg->tail;
+    uint32_t remainChunkNum =
+        (_trx->ubrTx.writePos > tail) ? (tail + cap - _trx->ubrTx.writePos) : 
(tail - _trx->ubrTx.writePos);
+    uint32_t needMsgChunkNum = CalcUbrMsgChunkCnt(bufLen);
+    if (remainChunkNum < needMsgChunkNum) {
+        return UBRING_RETRY;
+    }
+    UbrMsgFormat *msg = &(_trx->ubrTx.localMsgSpace);
+    uint32_t totalSendLen = 0;
+    uint32_t remainBufLen = bufLen;
+    uint8_t isLastPkt = 0;
+    _trx->ubrTx.outIoId++;
+    ((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->ioId = 
_trx->ubrTx.outIoId;
+    while (remainBufLen > 0) {
+        isLastPkt = (uint8_t)(remainBufLen <= UBR_MSG_PAYLOAD_LEN);
+        msg->header[UBR_MSG_FLAG_INDEX] = isLastPkt ? UBR_MSG_CHUNK_EOF : 
UBR_MSG_CHUNK_EXIST;
+        msg->header[UBR_MSG_LEN_INDEX] = isLastPkt ? (uint8_t)remainBufLen : 
UBR_MSG_PAYLOAD_LEN;
+        msg->header[UBR_MSG_CUR_INDEX] = 0;
+        memcpy(msg->payload.inner, (const uint8_t *)buf + totalSendLen, 
msg->header[UBR_MSG_LEN_INDEX]);
+        Copy64Byte((int8_t *)&dataMsg[_trx->ubrTx.writePos], (int8_t *)msg);
+        _trx->ubrTx.writePos = (_trx->ubrTx.writePos + 1) % cap;
+        totalSendLen += msg->header[UBR_MSG_LEN_INDEX];
+        remainBufLen -= msg->header[UBR_MSG_LEN_INDEX];
+    }
+    return (int)totalSendLen;
+}
+
+int UBRing::UbrTrxRecv(void *buf, uint32_t bufLen)
+{
+    RETURN_CODE rc = UBRING_OK;
+    if (UNLIKELY((rc = CheckTrxRecvParam(_trx, buf, bufLen)) != UBRING_OK)) {
+        return (rc == UBR_NOT_CONNECTED) ? 0 : rc;
+    }
+    UbrMsgFormat *dataMsg = (UbrMsgFormat *)_trx->ubrRx.localDataQ.addr;
+    uint32_t readPosEnd = _trx->ubrRx.readPos;
+    uint8_t flag = dataMsg[readPosEnd].header[UBR_MSG_FLAG_INDEX];
+    if (flag == UBR_MSG_CHUNK_NONE) {
+        return UBRING_RETRY;
+    }
+    return UbrTrxRecvBlockMode(static_cast<uint8_t *>(buf), bufLen);
+}
+
+int UBRing::UbrTrxRecvBlockMode(uint8_t *dest, uint32_t bufLen)
+{
+    RETURN_CODE rc = UBRING_OK;
+    if (UNLIKELY((rc = CheckTrxRecvParam(_trx, dest, bufLen)) != UBRING_OK)) {
+        return (rc == UBR_NOT_CONNECTED) ? 0 : rc;
+    }
+
+    int32_t totalCopied = 0;
+    int32_t remainingLen = (int32_t)bufLen;
+    bool notEofEncountered = true;
+
+    UbrRx *ubrRx = &_trx->ubrRx;
+    UbrMsgFormat *dataMsg = (UbrMsgFormat *)ubrRx->localDataQ.addr;
+    bool needUpdateEpollEofPos = ubrRx->readPos == ubrRx->epEofPos;
+
+    while (notEofEncountered && remainingLen > 0) {
+        if (UNLIKELY(CheckTrxRecvPreCheck(_trx) != UBRING_OK)) {
+            return UBRING_ERR;
+        }
+        UbrMsgFormat *currentChunk = &dataMsg[ubrRx->readPos];
+        uint8_t flag = currentChunk->header[UBR_MSG_FLAG_INDEX];
+        if (flag == UBR_MSG_CHUNK_NONE) {
+            if (totalCopied > 0) {
+                break;
+            }
+            errno = EAGAIN;
+            return -1;
+        }
+        if (flag == UBR_MSG_CHUNK_EOF) {
+            notEofEncountered = false;
+        }
+        uint8_t chunkMsgLen = currentChunk->header[UBR_MSG_LEN_INDEX];
+        uint8_t curIndex = currentChunk->header[UBR_MSG_CUR_INDEX];
+        uint8_t availableData = chunkMsgLen - curIndex;
+
+        int32_t copyLen = (remainingLen < availableData) ? remainingLen : 
availableData;
+        memcpy(dest + totalCopied, dataMsg[ubrRx->readPos].payload.inner + 
curIndex, (size_t)copyLen);
+        totalCopied += copyLen;
+        remainingLen -= copyLen;
+        currentChunk->header[UBR_MSG_CUR_INDEX] += (uint8_t)copyLen;
+        if (LIKELY(currentChunk->header[UBR_MSG_CUR_INDEX] == chunkMsgLen)) {
+            currentChunk->header[UBR_MSG_FLAG_INDEX] = UBR_MSG_CHUNK_NONE;
+            UpdateDataQTail(_trx);
+            ubrRx->readPos = (ubrRx->readPos + 1) % ubrRx->capacity;
+        }
+    }
+    if (needUpdateEpollEofPos) {
+        ubrRx->epEofPos = ubrRx->readPos;
+    }
+    return (int)totalCopied;
+}
+
+ssize_t UBRing::UbrTrxWritev(const struct iovec *iov, int iovcnt)
+{
+    if (UNLIKELY(CheckTrxSendPreCheck(_trx) != UBRING_OK)) {
+        return UBRING_ERR;
+    }
+
+    size_t bufLen = 0;
+    for (int i = 0; i < iovcnt; i++) {
+        bufLen += iov[i].iov_len;
+    }
+    RETURN_CODE rc = WritevHasEnoughSpace(bufLen);
+    if (rc != UBRING_OK) {
+        return rc;
+    }
+
+    UbrMsgFormat *dataMsg = (UbrMsgFormat *)_trx->ubrTx.remoteDataQ.addr;
+    UbrMsgFormat *msg = &(_trx->ubrTx.localMsgSpace);
+    int curIov = 0;
+    size_t curIovPos = 0;
+    ssize_t totalSendLen = 0;
+    size_t pktRemainN = 0;
+    size_t iovRemain = 0;
+    size_t fulled = 0;
+    uint8_t isLastPkt = 0;
+    uint8_t curPktLen = 0;
+    _trx->ubrTx.outIoId++;
+    ((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->ioId = 
_trx->ubrTx.outIoId;
+    while (bufLen > 0) {
+        isLastPkt = (uint8_t)(bufLen <= UBR_MSG_PAYLOAD_LEN);
+        curPktLen = isLastPkt ? (uint8_t)bufLen : UBR_MSG_PAYLOAD_LEN;
+        msg->header[UBR_MSG_FLAG_INDEX] = isLastPkt ? UBR_MSG_CHUNK_EOF : 
UBR_MSG_CHUNK_EXIST;
+        msg->header[UBR_MSG_LEN_INDEX] = curPktLen;
+        msg->header[UBR_MSG_CUR_INDEX] = 0;
+        pktRemainN = curPktLen;
+        while (curIov < iovcnt && pktRemainN > 0) {
+            iovRemain = (iov[curIov].iov_len - curIovPos);
+            fulled = iovRemain > pktRemainN ? pktRemainN : iovRemain;
+            memcpy((msg->payload.inner + (curPktLen - (uint8_t)pktRemainN)),

Review Comment:
   Does ub simply copy the memory from iobuf to complete the transfer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to