chenBright commented on code in PR #3290: URL: https://github.com/apache/brpc/pull/3290#discussion_r3377664196
########## src/brpc/ubshm/timer/timer_mgr.cpp: ########## @@ -0,0 +1,468 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#define _GNU_SOURCE +#include <pthread.h> +#include <sched.h> +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <atomic> +#include <sys/resource.h> +#include "brpc/ubshm/timer/timer_mgr.h" + +namespace brpc { +namespace ubring { + +int32_t g_epollFd = -1; +std::atomic<uint32_t> g_totalTimerNum; +TimerFdCtx *g_timerFdCtxMap = NULL; +uint32_t maxSystemFd; +static pthread_t g_epollExecuteThread; +static int32_t g_timerModuleInitialized; Review Comment: 1. These variables need to be set with default values. 2. The variable name should be snake_case. ########## src/brpc/ubshm/timer/timer_mgr.cpp: ########## @@ -0,0 +1,468 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#define _GNU_SOURCE +#include <pthread.h> +#include <sched.h> +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <atomic> +#include <sys/resource.h> +#include "brpc/ubshm/timer/timer_mgr.h" + +namespace brpc { +namespace ubring { + +int32_t g_epollFd = -1; +std::atomic<uint32_t> g_totalTimerNum; +TimerFdCtx *g_timerFdCtxMap = NULL; +uint32_t maxSystemFd; +static pthread_t g_epollExecuteThread; +static int32_t g_timerModuleInitialized; + +#if defined(OS_MACOSX) +static int timerfd_create_macosx(int clockid, int flags); +static int timerfd_settime_macosx(int fd, int flags, + const itimerspec *new_value, + itimerspec *old_value); +#endif + +static RETURN_CODE DeleteTimerInner(uint32_t fd) { + if (g_timerFdCtxMap == NULL) { + return UBRING_OK; + } + + if (pthread_spin_lock(&g_timerFdCtxMap[fd].spinLock) != 0) { + return UBRING_ERR; + } + + if (g_timerFdCtxMap[fd].status == TIMER_CONTEXT_NOT_USING) { + pthread_spin_unlock(&g_timerFdCtxMap[fd].spinLock); + return UBRING_OK; + } + + g_timerFdCtxMap[fd].status = TIMER_CONTEXT_NOT_USING; + g_timerFdCtxMap[fd].cb = NULL; + g_timerFdCtxMap[fd].args = NULL; + g_timerFdCtxMap[fd].periodical = 0; + g_timerFdCtxMap[fd].fd = 0; + + pthread_spin_unlock(&g_timerFdCtxMap[fd].spinLock); + +#if defined(OS_LINUX) + epoll_ctl(g_epollFd, EPOLL_CTL_DEL, (int)fd, NULL); +#elif defined(OS_MACOSX) + struct kevent evt; + EV_SET(&evt, fd, EVFILT_TIMER, EV_DELETE, 0, 0, NULL); + kevent(g_epollFd, &evt, 1, NULL, 0, NULL); +#endif + + uint64_t exp = 0; + read((int)fd, &exp, sizeof(exp)); + + close((int)fd); + atomic_fetch_sub(&g_totalTimerNum, 1); + return UBRING_OK; +} + +static RETURN_CODE StartTimeEpoll(void) { +#if defined(OS_LINUX) + g_epollFd = epoll_create1(0); +#elif defined(OS_MACOSX) + g_epollFd = kqueue(); +#endif + if (UNLIKELY(g_epollFd == -1)) { + LOG(ERROR) << "Failed to create epoll/kqueue. errno=" << errno; + return UBRING_ERR; + } + + int ret = pthread_create(&g_epollExecuteThread, NULL, TimerEpoll, NULL); + if (UNLIKELY(ret != 0)) { + LOG(ERROR) << "Failed to create thread err=" << ret; + return UBRING_ERR; + } + return UBRING_OK; +} + +static RETURN_CODE TimerSpinLocksInit(void) { + if (g_timerFdCtxMap == NULL) { + LOG(ERROR) << "Timer module is not fully initialized."; + return UBRING_ERR; + } + + for (uint32_t fd = 0; fd < maxSystemFd; fd++) { + int ret = pthread_spin_init(&g_timerFdCtxMap[fd].spinLock, + PTHREAD_PROCESS_PRIVATE); + if (ret != EOK) { + LOG(ERROR) << "Failed to initialize spin lock for fd=" << fd; + for (uint32_t cleanupFd = 0; cleanupFd < fd; cleanupFd++) { + pthread_spin_destroy(&g_timerFdCtxMap[cleanupFd].spinLock); + } + return UBRING_ERR; + } + } + return UBRING_OK; +} + +static RETURN_CODE ExecuteCallback(int32_t timerFd) { + UnifiedCallback((void *)(&g_timerFdCtxMap[timerFd])); + return UBRING_OK; +} + +static RETURN_CODE TimerCtxMapCompletion(void) { + memset(g_timerFdCtxMap, 0, sizeof(TimerFdCtx) * maxSystemFd); + + RETURN_CODE ret = TimerSpinLocksInit(); + if (ret != UBRING_OK) { + LOG(ERROR) << "Failed to init spin locks for timer module."; + return UBRING_ERR; + } + return UBRING_OK; +} + +RETURN_CODE TimerInit(void) { + if (g_timerModuleInitialized > 0) { + return UBRING_OK; + } + + g_totalTimerNum.store(0); + + struct rlimit rlim; + if (getrlimit(RLIMIT_NOFILE, &rlim) != UBRING_OK) { + LOG(ERROR) << "Failed to get fd"; + return UBRING_ERR; + } + maxSystemFd = (uint32_t)rlim.rlim_cur; + + if (g_timerFdCtxMap == NULL) { + g_timerFdCtxMap = (TimerFdCtx *)malloc(sizeof(TimerFdCtx) * maxSystemFd); Review Comment: g_timerFdCtxMap may consume a lot of memory. ########## src/brpc/ubshm/ubr_msg.h: ########## @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_UBR_MSG_H +#define BRPC_UBR_MSG_H +#define UBR_MSG_HEADER_LEN 4 +#define UBR_MSG_PAYLOAD_LEN 60 +#define UBR_MSG_LEN (UBR_MSG_HEADER_LEN + UBR_MSG_PAYLOAD_LEN) + +#define UBR_MSG_FLAG_INDEX 0 +#define UBR_MSG_LEN_INDEX 1 +#define UBR_MSG_CUR_INDEX 2 + +namespace brpc { +namespace ubring { +typedef enum { + UBR_MSG_CHUNK_NONE = 0, + UBR_MSG_CHUNK_EXIST = 1, + UBR_MSG_CHUNK_EOF = 2 +} UbrMsgHdrFlag; + +typedef struct TagUbrMsgPayload { + uint8_t inner[UBR_MSG_PAYLOAD_LEN]; +} UbrMsgPayload; + +typedef struct __attribute__((aligned(64))) TagUbrMsgFormat { Review Comment: Use BAIDU_CACHELINE_ALIGNMENT instead. ########## src/brpc/ubshm/common/thread_lock.h: ########## @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_THREAD_LOCK_H +#define BRPC_THREAD_LOCK_H +#include <unistd.h> +#include <stdlib.h> +#include <fcntl.h> +#include <semaphore.h> +#include <pthread.h> +#include "brpc/ubshm/common/common.h" + +#ifdef __cplusplus +extern "C" { +#endif + +static inline void UnlockMutex(pthread_mutex_t **mtx) +{ + if (LIKELY(mtx != NULL && *mtx != NULL)) { + pthread_mutex_unlock(*mtx); + } else { + LOG(ERROR) << "Invalid input for mtx."; + } +} + +#define LOCK_GUARD(mtxPtr) \ + pthread_mutex_t *__attribute__((cleanup(UnlockMutex))) _mtxPtr = ({ \ + pthread_mutex_lock(&(mtxPtr)); \ + &(mtxPtr); \ + }) + +static inline void UnlockSpinLock(pthread_spinlock_t **spinLock) +{ + if (LIKELY(spinLock != NULL && *spinLock != NULL)) { + pthread_spin_unlock(*spinLock); + } else { + LOG(ERROR) << "Invalid input for spinLock."; + } +} + +#define SPIN_LOCK_GUARD(spinLockPtr) \ + pthread_spinlock_t *__attribute__((cleanup(UnlockSpinLock))) _spinLockPtr = ({ \ + pthread_spin_lock(&(spinLockPtr)); \ + &(spinLockPtr); \ + }) Review Comment: Use BAIDU_SCOPED_LOCK or std::lock_guard insteal. ########## src/brpc/ubshm/common/common.h: ########## @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_COMMON_H +#define BRPC_COMMON_H +#include <unistd.h> +#include <stdint.h> +#include <stdlib.h> +#include <time.h> +#include "butil/logging.h" + +#define LIKELY(x) __builtin_expect(!!(x), 1) +#define UNLIKELY(x) __builtin_expect(!!(x), 0) Review Comment: Use BAIDU_LIKELY and BAIDU_UNLIKELY instead. ########## src/brpc/ubshm/shm/shm_ubs.cpp: ########## @@ -0,0 +1,565 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#define _GNU_SOURCE +#include <stdlib.h> +#include <assert.h> +#include <errno.h> +#include <dlfcn.h> +#include <time.h> +#include <gflags/gflags.h> +#include "brpc/ubshm/timer/timer_mgr.h" +#include "brpc/ubshm/common/thread_lock.h" +#include "brpc/ubshm/common/common.h" +#include "brpc/ubshm/shm/shm_def.h" +#include "brpc/ubshm/ub_ring_manager.h" +#include "brpc/ubshm/ubs_mem/ubs_mem.h" +#include "brpc/ubshm/ubs_mem/ubs_mem_def.h" +#ifdef UT +#include "ubs_mem.h" +#endif +#include "shm_ubs.h" + +namespace brpc { +namespace ubring { +#define UBRING_MK_UBSM(ret, fn, args) ret (*fn) args = NULL +#include "brpc/ubshm/ubs_mem/declare_shm_ubs.h" +#define SHM_RIGHT_MODE 0666 +#define UBRING_REGION_NAME_PREFIX "UbrONE2ALLRegion" +DEFINE_uint32(node_location, 1, "Location of the ub machine."); +DEFINE_bool(shm_wr_delay_comp, true, "Indicates whether to enable the write relay." + "0: relay; 1: non-relay."); +DEFINE_int32(ub_flying_io_timeout, 5, "Waiting time for stopping data" + "sending and receiving when the link is disconnected."); +char g_regionName[MAX_REGION_NAME_DESC_LENGTH] = {0}; +int g_shmTimerFd = 0; +ShmList *g_shmList = NULL; +static RETURN_CODE UbsShmInterfacesLoad(void); +char hostname[MAX_HOST_NAME_DESC_LENGTH]; + +RETURN_CODE UbsShmInterfacesLoad(void) +{ +#ifndef UT + const char *ubsmSdkLocation = "/usr/local/ubs_mem/lib/libubsm_sdk.so"; +#if defined(OS_LINUX) + void* dlhandler = dlmopen(LM_ID_NEWLM, ubsmSdkLocation, RTLD_NOW | RTLD_LOCAL | RTLD_NODELETE | RTLD_DEEPBIND); +#elif defined(OS_MACOSX) + void* dlhandler = dlopen(ubsmSdkLocation, RTLD_NOW | RTLD_LOCAL | RTLD_NODELETE); +#endif + if (dlhandler == NULL) { + LOG(ERROR) << "Dlopen libubsm_sdk.so in " << ubsmSdkLocation << " failed, error:" << dlerror(); + return UBRING_ERR; + } + +#define UBRING_MK_UBSM_OPTIONAL(ret, fn, args) \ + do { \ + fn = (decltype(fn))dlsym(dlhandler, #fn); \ + } while (0) + +#define UBRING_MK_UBSM(ret, fn, args) \ + do { \ + if ((fn) != NULL) { \ + break; \ + } \ + UBRING_MK_UBSM_OPTIONAL(ret, fn, args); \ + if ((fn) == NULL) { \ + LOG(ERROR) << "Fail load ubs_mem func " << #fn <<" error:" << dlerror(); \ + return UBRING_ERR; \ + } \ + } while (0) +#include "brpc/ubshm/ubs_mem/declare_shm_ubs.h" + + dlclose(dlhandler); + dlhandler = NULL; +#endif + return UBRING_OK; +} + +static RETURN_CODE CreateUbsShmRegion(const char *regionName) +{ + int ret = snprintf(g_regionName, MAX_REGION_NAME_DESC_LENGTH, "%s_%u", + UBRING_REGION_NAME_PREFIX, FLAGS_node_location); + if (ret < 0) { + LOG(ERROR) << "Snprintf_s region name failed, ret=" << ret; + return UBRING_ERR; + } + + ubsmem_regions_t regions = {0}; // 16 * (48 + 1) bytes, 约0.8k + ret = ubsmem_lookup_regions(®ions); + if (ret != UBSM_OK || regions.region[0].host_num <= 0) { + LOG(ERROR) << "Ubs lookup share region failed, ret=" << ret << ", region.num=" << regions.region[0].host_num; + return UBRING_ERR; + } + ubsmem_region_attributes_t regionAttr = {0}; + regionAttr.host_num = regions.region[0].host_num; + for (int i = 0; i < regionAttr.host_num; i++) { + strcpy(regionAttr.hosts[i].host_name, regions.region[0].hosts[i].host_name); + regionAttr.hosts[i].affinity = (strcmp(regionAttr.hosts[i].host_name, hostname) == 0) ? + true : false; + } + + ret = ubsmem_create_region(regionName, 0, ®ionAttr); + if (ret == UBSM_ERR_ALREADY_EXIST) { + LOG(WARNING) << "Ubs region exists, region_name=" << regionName; + return UBRING_OK; + } else if (ret != UBSM_OK) { + LOG(ERROR) << "Ubsmem create region failed, ret=" << ret; + return UBRING_ERR; + } + + return UBRING_OK; +} + +static uint64_t AquireFlagIfWrDelayComp(const uint64_t flag) +{ + if (FLAGS_shm_wr_delay_comp == 0) { + return flag; + } + return flag | UBSM_FLAG_WR_DELAY_COMP; +} + +RETURN_CODE UbsShmLocalMalloc(SHM *shm) +{ + int ret = ubsmem_shmem_allocate(g_regionName, shm->name, shm->len, SHM_RIGHT_MODE, + AquireFlagIfWrDelayComp(UBSM_FLAG_ONLY_IMPORT_NONCACHE | UBSM_FLAG_MEM_ANONYMOUS)); +do { + if (ret == UBSM_ERR_ALREADY_EXIST) { + if (ubsmem_shmem_deallocate(shm->name) != UBSM_OK) { + LOG(ERROR) << "Ubs create shm name=" << shm->name << " failed, shm exists, ret=" << ret; + return SHM_ERR_EXIST; + } + LOG(INFO) << "Ubs delete shm name=" << shm->name << " success, try to recreate."; + ret = ubsmem_shmem_allocate(g_regionName, shm->name, shm->len, SHM_RIGHT_MODE, + AquireFlagIfWrDelayComp(UBSM_FLAG_ONLY_IMPORT_NONCACHE | UBSM_FLAG_MEM_ANONYMOUS)); + if (ret != UBSM_OK) { + LOG(ERROR) << "Ubs recreate shm name=" << shm->name << " failed, ret=" << ret; + return SHM_ERR; + } + } else if (ret != UBSM_OK) { + LOG(ERROR) << "Ubs create shm name=" << shm->name << " failed, ret=" << ret; + return SHM_ERR; + } +} while (0); + + ret = ubsmem_shmem_map(NULL, shm->len, PROT_READ | PROT_WRITE, MAP_SHARED, shm->name, 0, (void**)&(shm->addr)); + if (ret != UBSM_OK) { + LOG(ERROR) << "Ubs map shm=" << shm->name << " failed, ret=" << ret; + if (ret == UBSM_ERR_NOT_FOUND) { + return SHM_ERR_NOT_FOUND; + } + ubsmem_shmem_deallocate(shm->name); + return SHM_ERR; + } + + // 通过MXE获取memid + shm->memid = 1; // 暂时打桩 + LOG(INFO) << "Ubs malloc local shm=" << shm->name << " length=" << shm->len << " memid=" << shm->memid << " success."; + return UBRING_OK; +} + +RETURN_CODE UbsShmMunmap(SHM *shm) +{ + // unmap + if (shm->addr == NULL) { + LOG(ERROR) << "Ubs input shm param is invalid, addr is NULL."; + return SHM_ERR_INPUT_INVALID; + } + + int ret = ubsmem_shmem_unmap(shm->addr, shm->len); + if (ret != UBSM_OK) { + if (ret == UBSM_ERR_NET) { + LOG(ERROR) << "Ubs unmap shm=" << shm->name << " failed, ubsm net err=" << ret; + AddShmToList(g_shmList, shm); + return SHM_ERR_UBSM_NET_ERR; + } + LOG(ERROR) << "Ubs unmap shm=" << shm->name << " length=" << shm->len << " failed, ret=" << ret; + return SHM_ERR; + } + + LOG(INFO) << "Ubs unmap shm=" << shm->name << " length=" << shm->len << " success."; + return UBRING_OK; +} + +RETURN_CODE UbsShmFree(SHM *shm) +{ + if (shm->addr == NULL) { + LOG(ERROR) << "Ubs input shm param is invalid, addr is NULL."; + return SHM_ERR_INPUT_INVALID; + } + + // free + int ret = ubsmem_shmem_deallocate(shm->name); + if (ret != UBSM_OK) { + if (ret == UBSM_ERR_IN_USING) { + LOG(INFO) << "Ubs free shm=" << shm->name << " failed, resource attached=" << ret; + return SHM_ERR_RESOURCE_ATTACHED; + } else if (ret == UBSM_ERR_NOT_FOUND) { + LOG(INFO) << "Ubs free shm=" << shm->name << " failed, resource not found=" << ret; + return SHM_ERR_NOT_FOUND; + } + LOG(ERROR) << "Ubs free shm="<< shm->name << " failed, ret=" << ret; + return SHM_ERR; + } + shm->addr = NULL; + LOG(INFO) << "Ubs free shm=" << shm->name << " length=" << shm->len << " success."; + return UBRING_OK; +} + +RETURN_CODE UbsShmLocalFree(SHM *shm) +{ + // unmap + if (shm->addr == NULL) { + LOG(ERROR) << "Ubs input shm param is invalid, addr is NULL."; + return SHM_ERR_INPUT_INVALID; + } + + int ret = ubsmem_shmem_unmap(shm->addr, shm->len); + if (ret != UBSM_OK) { + if (ret == UBSM_ERR_NET) { + LOG(ERROR) << "Ubs unmap shm=" << shm->name << " failed, ubsm net err=" << ret; + AddShmToList(g_shmList, shm); + return SHM_ERR_UBSM_NET_ERR; + } + LOG(WARNING) << "Ubs unmap shm=" << shm->name << " length=" << shm->len << " failed, ret=" << ret; + } + + // free + ret = ubsmem_shmem_deallocate(shm->name); + if (ret != UBSM_OK) { + if (ret == UBSM_ERR_IN_USING) { + LOG_EVERY_SECOND(INFO) << "Ubs delete shm=" << shm->name << " failed, resource attached=" << ret; + return SHM_ERR_RESOURCE_ATTACHED; + } + LOG(ERROR) << "Ubs delete shm=" << shm->name << " failed, ret=" << ret; + return SHM_ERR; + } + shm->addr = NULL; + LOG(INFO) << "Ubs free local shm=" << shm->name << " length=" << shm->len << " success."; + return UBRING_OK; +} + +RETURN_CODE UbsShmRemoteMalloc(SHM *shm) +{ + int ret = ubsmem_shmem_map(NULL, shm->len, PROT_READ | PROT_WRITE, MAP_SHARED, shm->name, 0, (void**)&(shm->addr)); + if (ret != UBSM_OK) { + LOG(ERROR) << "Ubs map Shm=" << shm->name << " failed, ret=" << ret; + return SHM_ERR; + } + + LOG(INFO) << "Ubs malloc remote shm=" << shm->name << " length=" << shm->len << " success."; + return UBRING_OK; +} + +RETURN_CODE UbsShmLocalMmap(SHM *shm, int prot) +{ + int ret = ubsmem_shmem_map(NULL, shm->len, prot, MAP_SHARED, shm->name, 0, (void**)&(shm->addr)); + if (ret != UBSM_OK) { + LOG(ERROR) << "Ubs map Shm=" << shm->name << " failed, ret=" << ret; + return SHM_ERR; + } + + LOG(INFO) << "Ubs mmap remote shm=" << shm->name << " length=" << shm->len << " success."; + return UBRING_OK; +} + +RETURN_CODE UbsShmRemoteFree(SHM *shm) +{ + // unmap + if (shm->addr == NULL) { + LOG(ERROR) << "Ubs input shm param is invalid, addr is NULL."; + return SHM_ERR_INPUT_INVALID; + } + + int ret = ubsmem_shmem_unmap(shm->addr, shm->len); + if (ret != UBSM_OK) { + if (ret == UBSM_ERR_NET) { + LOG(ERROR) << "Ubs unmap shm=" << shm->name << " failed, ubsm net err=" << ret; + AddShmToList(g_shmList, shm); + return SHM_ERR_UBSM_NET_ERR; + } + LOG(ERROR) << "Ubs unmap shm=" << shm->name << " length=" << shm->len << " failed, ret=" << ret; + return SHM_ERR; + } + + LOG(INFO) << "Ubs free Remote shm=" << shm->name << " length=" << shm->len << " success."; + return UBRING_OK; +} + +void UbsMemLoggerPrint(int level, const char *msg) +{ + if (level == UBSM_LOG_ERROR_LEVEL) { + LOG(ERROR) << msg; + } else if (level == UBSM_LOG_WARN_LEVEL) { + LOG(WARNING) << msg; + } else { + LOG(INFO) << msg; + } + return; +} + +RETURN_CODE UbsShmInit(void) +{ + // 加载libubsm_sdk.so函数指针 Review Comment: Please use English. ########## src/brpc/ubshm/timer/timer_mgr.cpp: ########## @@ -0,0 +1,468 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#define _GNU_SOURCE +#include <pthread.h> +#include <sched.h> +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <unistd.h> +#include <atomic> +#include <sys/resource.h> +#include "brpc/ubshm/timer/timer_mgr.h" + +namespace brpc { +namespace ubring { + +int32_t g_epollFd = -1; +std::atomic<uint32_t> g_totalTimerNum; +TimerFdCtx *g_timerFdCtxMap = NULL; +uint32_t maxSystemFd; +static pthread_t g_epollExecuteThread; +static int32_t g_timerModuleInitialized; + +#if defined(OS_MACOSX) +static int timerfd_create_macosx(int clockid, int flags); +static int timerfd_settime_macosx(int fd, int flags, + const itimerspec *new_value, + itimerspec *old_value); +#endif + +static RETURN_CODE DeleteTimerInner(uint32_t fd) { + if (g_timerFdCtxMap == NULL) { + return UBRING_OK; + } + + if (pthread_spin_lock(&g_timerFdCtxMap[fd].spinLock) != 0) { + return UBRING_ERR; + } + + if (g_timerFdCtxMap[fd].status == TIMER_CONTEXT_NOT_USING) { + pthread_spin_unlock(&g_timerFdCtxMap[fd].spinLock); + return UBRING_OK; + } + + g_timerFdCtxMap[fd].status = TIMER_CONTEXT_NOT_USING; + g_timerFdCtxMap[fd].cb = NULL; + g_timerFdCtxMap[fd].args = NULL; + g_timerFdCtxMap[fd].periodical = 0; + g_timerFdCtxMap[fd].fd = 0; + + pthread_spin_unlock(&g_timerFdCtxMap[fd].spinLock); + +#if defined(OS_LINUX) + epoll_ctl(g_epollFd, EPOLL_CTL_DEL, (int)fd, NULL); +#elif defined(OS_MACOSX) + struct kevent evt; + EV_SET(&evt, fd, EVFILT_TIMER, EV_DELETE, 0, 0, NULL); + kevent(g_epollFd, &evt, 1, NULL, 0, NULL); +#endif + + uint64_t exp = 0; + read((int)fd, &exp, sizeof(exp)); + + close((int)fd); + atomic_fetch_sub(&g_totalTimerNum, 1); + return UBRING_OK; +} + +static RETURN_CODE StartTimeEpoll(void) { +#if defined(OS_LINUX) + g_epollFd = epoll_create1(0); +#elif defined(OS_MACOSX) + g_epollFd = kqueue(); +#endif + if (UNLIKELY(g_epollFd == -1)) { + LOG(ERROR) << "Failed to create epoll/kqueue. errno=" << errno; + return UBRING_ERR; + } + + int ret = pthread_create(&g_epollExecuteThread, NULL, TimerEpoll, NULL); + if (UNLIKELY(ret != 0)) { + LOG(ERROR) << "Failed to create thread err=" << ret; + return UBRING_ERR; + } + return UBRING_OK; +} + +static RETURN_CODE TimerSpinLocksInit(void) { + if (g_timerFdCtxMap == NULL) { + LOG(ERROR) << "Timer module is not fully initialized."; + return UBRING_ERR; + } + + for (uint32_t fd = 0; fd < maxSystemFd; fd++) { + int ret = pthread_spin_init(&g_timerFdCtxMap[fd].spinLock, + PTHREAD_PROCESS_PRIVATE); + if (ret != EOK) { + LOG(ERROR) << "Failed to initialize spin lock for fd=" << fd; + for (uint32_t cleanupFd = 0; cleanupFd < fd; cleanupFd++) { + pthread_spin_destroy(&g_timerFdCtxMap[cleanupFd].spinLock); + } + return UBRING_ERR; + } + } + return UBRING_OK; +} + +static RETURN_CODE ExecuteCallback(int32_t timerFd) { + UnifiedCallback((void *)(&g_timerFdCtxMap[timerFd])); + return UBRING_OK; +} + +static RETURN_CODE TimerCtxMapCompletion(void) { + memset(g_timerFdCtxMap, 0, sizeof(TimerFdCtx) * maxSystemFd); + + RETURN_CODE ret = TimerSpinLocksInit(); + if (ret != UBRING_OK) { + LOG(ERROR) << "Failed to init spin locks for timer module."; + return UBRING_ERR; + } + return UBRING_OK; +} + +RETURN_CODE TimerInit(void) { + if (g_timerModuleInitialized > 0) { + return UBRING_OK; + } + + g_totalTimerNum.store(0); + + struct rlimit rlim; + if (getrlimit(RLIMIT_NOFILE, &rlim) != UBRING_OK) { + LOG(ERROR) << "Failed to get fd"; + return UBRING_ERR; + } + maxSystemFd = (uint32_t)rlim.rlim_cur; + + if (g_timerFdCtxMap == NULL) { + g_timerFdCtxMap = (TimerFdCtx *)malloc(sizeof(TimerFdCtx) * maxSystemFd); + if (UNLIKELY(!g_timerFdCtxMap)) { + LOG(ERROR) << "Fail to malloc space for timer modules. errno=%d", errno; + return UBRING_ERR; + } + + RETURN_CODE ret = TimerCtxMapCompletion(); + if (ret != UBRING_OK) { + LOG(ERROR) << "Failed to init main data structure of Time Module. ret=" << ret; + free(g_timerFdCtxMap); + g_timerFdCtxMap = NULL; + return UBRING_ERR; + } + } + + RETURN_CODE ret = StartTimeEpoll(); + if (ret != UBRING_OK) { + LOG(ERROR) << "Failed to start Timer Epoll. ret=" << ret; + if (LIKELY(g_timerFdCtxMap != NULL)) { + FREE_PTR(g_timerFdCtxMap); + } + return UBRING_ERR; + } + g_timerModuleInitialized = 1; + return UBRING_OK; +} + +void *UnifiedCallback(void *args) { + TimerFdCtx *ctx = (TimerFdCtx *)args; + if (pthread_spin_lock(&ctx->spinLock) != 0) { + return NULL; + } + + if (ctx->status == TIMER_CONTEXT_NOT_USING) { + pthread_spin_unlock(&ctx->spinLock); + return NULL; + } + + void *(*cb)(void *) = ctx->cb; + void *cbArgs = ctx->args; + uint32_t fd = ctx->fd; + int isPeriodical = ctx->periodical; + ctx->status = TIMER_CONTEXT_CALLBACK_ONGOING; + + pthread_spin_unlock(&ctx->spinLock); + + cb(cbArgs); + + if (!isPeriodical) { + DeleteTimerInner(fd); + } + return NULL; +} + +void *TimerEpoll(void *args) { + UNREFERENCE_PARAM(args); +#if defined(OS_LINUX) + struct epoll_event readyEvents[MAX_TIMER]; +#elif defined(OS_MACOSX) + struct kevent readyEvents[MAX_TIMER]; +#endif + + while (1) { + if (g_timerModuleInitialized <= 0) { + LOG(ERROR) << "The Timer module is not initialized."; + break; + } + +#if defined(OS_LINUX) + int32_t readyNum = epoll_wait(g_epollFd, readyEvents, MAX_TIMER, + TIMER_EPOLL_WAIT_TIMEOUT); +#elif defined(OS_MACOSX) + struct timespec timeout = {0, TIMER_EPOLL_WAIT_TIMEOUT * 1000000}; + int32_t readyNum = kevent(g_epollFd, NULL, 0, readyEvents, MAX_TIMER, &timeout); +#endif + + if (UNLIKELY(readyNum == -1)) { + errno_t err = errno; + if (err == EINTR) { + LOG_EVERY_SECOND(WARNING) << "Epoll/Kqueue wait was interrupted. errno=" << err; + continue; + } else if (err == EBADF) { + LOG(WARNING) << "The Timer module is destroyed."; + break; + } + LOG(ERROR) << "Epoll/Kqueue wait internal error. errno=" << err; + break; + } + + for (int32_t i = 0; i < readyNum; i++) { +#if defined(OS_LINUX) + struct epoll_event *event = &readyEvents[i]; + int32_t timerFd = event->data.fd; +#elif defined(OS_MACOSX) + struct kevent *event = &readyEvents[i]; + int32_t timerFd = event->ident; +#endif + + uint64_t exp = 0; + if (read(timerFd, &exp, sizeof(exp)) < 0) { + if (errno != EBADF) { + LOG(ERROR) << "Failed to read timerfd=" << timerFd << " errno=" << errno; + } + continue; + } + if (TimerFdCtxValidate((uint32_t)timerFd) != UBRING_OK) { + continue; + } + + RETURN_CODE ret = ExecuteCallback(timerFd); + if (ret != UBRING_OK) { + LOG(ERROR) << "Failed execute callback ret=" << ret; + DeleteTimerInner((uint32_t)timerFd); + continue; + } + } + } + return NULL; +} + +void DeleteTimerSafe(uint32_t fd) { + if (g_timerFdCtxMap == NULL) { + return; + } + + if (pthread_spin_lock(&g_timerFdCtxMap[fd].spinLock) != 0) { + return; + } + + if (g_timerFdCtxMap[fd].status == TIMER_CONTEXT_NOT_USING) { + pthread_spin_unlock(&g_timerFdCtxMap[fd].spinLock); + return; + } + + g_timerFdCtxMap[fd].status = TIMER_CONTEXT_NOT_USING; + g_timerFdCtxMap[fd].cb = NULL; + g_timerFdCtxMap[fd].args = NULL; + g_timerFdCtxMap[fd].periodical = 0; + g_timerFdCtxMap[fd].fd = 0; + + pthread_spin_unlock(&g_timerFdCtxMap[fd].spinLock); + +#if defined(OS_LINUX) + epoll_ctl(g_epollFd, EPOLL_CTL_DEL, (int)fd, NULL); +#elif defined(OS_MACOSX) + struct kevent evt; + EV_SET(&evt, fd, EVFILT_TIMER, EV_DELETE, 0, 0, NULL); + kevent(g_epollFd, &evt, 1, NULL, 0, NULL); +#endif + + uint64_t exp = 0; + read((int)fd, &exp, sizeof(exp)); + + close((int)fd); + atomic_fetch_sub(&g_totalTimerNum, 1); +} + +void DeleteTimer(uint32_t fd) { + if (g_timerFdCtxMap == NULL) { + LOG(WARNING) << "The timer is not initialized."; + return; + } + + g_timerFdCtxMap[fd].periodical = 0; +} + +int32_t TimerStart(const itimerspec *time, void *(*cb)(void *), void *args) { + if (g_epollFd == -1) { + LOG(ERROR) << "Timer epoll/kqueue encountered internal error."; + return -1; + } + +#if defined(OS_LINUX) + int timerFd = timerfd_create(CLOCK_MONOTONIC, 0); +#elif defined(OS_MACOSX) + int timerFd = timerfd_create_macosx(CLOCK_MONOTONIC, 0); +#endif + + if (UNLIKELY(timerFd >= (int)maxSystemFd || timerFd == -1)) { + LOG(ERROR) << "Failed to create timerfd=" << timerFd << " errno=" << errno; + return -1; + } + + g_timerFdCtxMap[timerFd].status = TIMER_CONTEXT_EPOLL_WAITING; + g_timerFdCtxMap[timerFd].cb = cb; + g_timerFdCtxMap[timerFd].args = args; + g_timerFdCtxMap[timerFd].fd = (uint32_t)timerFd; + + if (LIKELY(time->it_interval.tv_sec > 0 || time->it_interval.tv_nsec > 0)) { + g_timerFdCtxMap[timerFd].periodical = 1; + } + +#if defined(OS_LINUX) + struct epoll_event event = { + .events = EPOLLIN, + .data = {.fd = timerFd} + }; + + int32_t ret = epoll_ctl(g_epollFd, EPOLL_CTL_ADD, timerFd, &event); +#elif defined(OS_MACOSX) + struct kevent event; + uint64_t timeout_nsec = time->it_value.tv_sec * 1000000000ULL + time->it_value.tv_nsec; + uint64_t interval_nsec = time->it_interval.tv_sec * 1000000000ULL + time->it_interval.tv_nsec; + EV_SET(&event, timerFd, EVFILT_TIMER, EV_ADD | EV_ENABLE, 0, + timeout_nsec / 1000000, NULL); + int32_t ret = kevent(g_epollFd, &event, 1, NULL, 0, NULL); +#endif + + if (UNLIKELY(ret != 0)) { + CloseTimerFd((uint32_t)timerFd); + LOG(ERROR) << "Failed to add event to epoll/kqueue. errno=" << errno; + return -1; + } + + atomic_fetch_add(&g_totalTimerNum, 1); + +#if defined(OS_LINUX) + ret = timerfd_settime(timerFd, 0, time, NULL); +#elif defined(OS_MACOSX) + ret = timerfd_settime_macosx(timerFd, 0, time, NULL); +#endif + + if (UNLIKELY(ret != 0)) { +#if defined(OS_LINUX) + if (epoll_ctl(g_epollFd, EPOLL_CTL_DEL, timerFd, NULL) != 0) { +#elif defined(OS_MACOSX) + struct kevent evt; + EV_SET(&evt, timerFd, EVFILT_TIMER, EV_DELETE, 0, 0, NULL); + if (kevent(g_epollFd, &evt, 1, NULL, 0, NULL) != 0) { +#endif + LOG(ERROR) << "Failed to delete the timer fd=" << timerFd << " with errno=" << errno; + } + CloseTimerFd((uint32_t)timerFd); + atomic_fetch_sub(&g_totalTimerNum, 1); + LOG(ERROR) << "Failed to set timer"; + return -1; + } + + return timerFd; +} + +uint32_t GetActiveTimerNum(void) { + return atomic_load(&g_totalTimerNum); +} + +void CloseTimerFd(uint32_t fd) { Review Comment: uint32_t -> int ########## src/brpc/ubshm/common/thread_lock.h: ########## @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_THREAD_LOCK_H +#define BRPC_THREAD_LOCK_H +#include <unistd.h> +#include <stdlib.h> +#include <fcntl.h> +#include <semaphore.h> +#include <pthread.h> +#include "brpc/ubshm/common/common.h" + +#ifdef __cplusplus +extern "C" { +#endif + +static inline void UnlockMutex(pthread_mutex_t **mtx) Review Comment: The functions and macros defined in this file are not used; it is recommended to remove them. ########## src/brpc/ubshm/common/thread_lock.h: ########## @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_THREAD_LOCK_H +#define BRPC_THREAD_LOCK_H +#include <unistd.h> +#include <stdlib.h> +#include <fcntl.h> +#include <semaphore.h> +#include <pthread.h> +#include "brpc/ubshm/common/common.h" + +#ifdef __cplusplus +extern "C" { +#endif + +static inline void UnlockMutex(pthread_mutex_t **mtx) +{ + if (LIKELY(mtx != NULL && *mtx != NULL)) { + pthread_mutex_unlock(*mtx); + } else { + LOG(ERROR) << "Invalid input for mtx."; + } +} + +#define LOCK_GUARD(mtxPtr) \ + pthread_mutex_t *__attribute__((cleanup(UnlockMutex))) _mtxPtr = ({ \ + pthread_mutex_lock(&(mtxPtr)); \ + &(mtxPtr); \ + }) Review Comment: Use BAIDU_SCOPED_LOCK or std::lock_guard insteal. ########## src/brpc/ubshm/ub_ring.cpp: ########## @@ -0,0 +1,1091 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <errno.h> +#include <iostream> +#include <gflags/gflags.h> +#include <unistd.h> +#include <ctime> +#include "bthread/bthread.h" +#include "butil/logging.h" +#include "brpc/ubshm/ub_ring.h" +#include "brpc/ubshm/ub_ring_manager.h" +#include "brpc/ubshm/shm/shm_ipc.h" + +namespace brpc { +namespace ubring { +uint32_t g_sleepTime[UBR_TASK_STEP_NUM] = {0}; +#define TIME_COVERSION 1000 +DEFINE_int32(ub_disconnect_timeout, 5, "Ubshm disconnection timeout."); +DEFINE_int32(ub_connect_timeout, 1, "Ubshm connection timeout."); +DEFINE_int32(ub_hb_timer_interval, 5, "Heartbeat timer interval."); +DEFINE_int32(ub_hb_retry_cnt, 10, "Heartbeat retry times."); +DEFINE_int32(ub_event_queue_timer_interval, 100, "Interval of the disconnection timer."); + +UBRing::UBRing() +{} +UBRing::~UBRing() +{} + +RETURN_CODE UBRing::UbrTrxMapShm(SHM *localShm, SHM *remoteShm) +{ + RETURN_CODE rc = UbrTrxMapLocalShm(localShm); + if (UNLIKELY(rc != UBRING_OK)) { + LOG(ERROR) << "Trx map local shared memory failed."; + return rc; + } + rc = UbrTrxMapRemoteShm(remoteShm); + if (UNLIKELY(rc != UBRING_OK)) { + LOG(ERROR) << "Trx map remote shared memory failed."; + return rc; + } + return UBRING_OK; +} + +RETURN_CODE UBRing::UbrTrxClose() { + RETURN_CODE closeCheckRc = UbrTrxCloseCheck(_trx); + if (UNLIKELY(closeCheckRc != UBRING_OK)) { + if (closeCheckRc == UBRING_REENTRY) { + LOG(INFO) << "Trx close skipped, already closing, local name=" << _trx->localShm.name; + return UBRING_OK; + } + return UBRING_ERR; + } + if (_trx->ubrRx.remoteTxEventQ.addr != nullptr) { + ((UbrEventQMsg *)_trx->ubrRx.remoteTxEventQ.addr)->flag = UBR_STATE_CLOSING; + } + + uint32_t disconnectTimeout = FLAGS_ub_disconnect_timeout; + uint64_t startTime = GetCurNanoSeconds(); + + if (_trx->ubrTx.localTxEventQ.addr != nullptr && ((UbrEventQMsg *)_trx->ubrTx.localTxEventQ.addr)->flag == UBR_STATE_CONNECTED) { + ((UbrEventQMsg *)_trx->ubrTx.localTxEventQ.addr)->flag = UBR_STATE_CLOSED; + _trx->ubrTx.trxState = UBR_STATE_CLOSED; + } + + if (_trx->ubrTx.remoteRxEventQ.addr != nullptr) { + ((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->flag = UBR_STATE_CLOSED; + } + while (_trx->ubrRx.localRxEventQ.addr != nullptr && ((UbrEventQMsg *)_trx->ubrRx.localRxEventQ.addr)->flag != UBR_STATE_CLOSED) { + UbrSetSleepTask(UBR_TASK_CLOSE); + if (HasTimedOut(startTime, disconnectTimeout) != UBRING_OK) { + LOG(WARNING) << "Local shm " << _trx->localShm.name + << " wait for the peer to close timed out, force cleanup."; + _trx->ubrRx.trxState = UBR_STATE_CLOSED; + // Force synchronous cleanup instead of relying on async timer + DeleteTimerSafe((uint32_t)_trx->timerFd); + DeleteTimerSafe((uint32_t)_trx->hbTimerFd); + if (_trx->ubrTx.remoteRxEventQ.addr != nullptr) { + ((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->flag = UBR_STATE_CLOSED; + } + if (UNLIKELY(UbrTrxFreeShm(_trx) != UBRING_OK)) { + LOG(WARNING) << "Force close, local shm " << _trx->localShm.name << " free failed."; + } + if (UNLIKELY(UBRingManager::ReleaseUbrTrxFromMgr(_trx) != UBRING_OK)) { + LOG(WARNING) << "Force close, release trx " << _trx->localShm.name << " failed."; + } + return UBRING_ERR_TIMEOUT; + } + bthread_usleep(1000); // 1ms, yield to other bthreads + } + _trx->ubrRx.trxState = UBR_STATE_CLOSED; + RETURN_CODE rc; + if (UNLIKELY((rc = ClearTrxResource(_trx, startTime, UBR_SEND_CLOSE)) != UBRING_OK)) { + if (rc == UBRING_REENTRY) { + LOG(INFO) << "Trx close, peer is closing, trx local name=" << _trx->localShm.name; + return UBRING_OK; + } + LOG(ERROR) << "Trx close, clear trx resource failed, trx local name=" << _trx->localShm.name; + return UBRING_ERR; + } + // Unlink local shm name immediately so process exit does not leave visible leftovers. + RETURN_CODE unlinkRc = ShmFree(&_trx->localShm); + if (unlinkRc != UBRING_OK && unlinkRc != SHM_ERR_NOT_FOUND && unlinkRc != SHM_ERR_RESOURCE_ATTACHED) { + LOG(WARNING) << "Trx close, unlink local shm failed, trx local name=" << _trx->localShm.name + << ", rc=" << unlinkRc; + } + return UBRING_OK; +} + +RETURN_CODE UBRing::UbrAddCloseTimer() { + if (UNLIKELY(_trx == NULL)) { + LOG(ERROR) << "Trx add close timer failed, trx is null."; + return UBRING_ERR; + } + + uint32_t eventQTimerInterval = FLAGS_ub_event_queue_timer_interval * TIME_COVERSION; + itimerspec timeSpec = { + .it_interval = {.tv_sec = 0, .tv_nsec = eventQTimerInterval}, + .it_value = {.tv_sec = 0, .tv_nsec = 1} + }; + int timerFd = TimerStart(&timeSpec, UbrTrxCloseCallback, (void*)_trx); + if (UNLIKELY(timerFd == -1)) { + LOG(ERROR) << "Start ubr close timer failed, trx local name=" << _trx->localShm.name; + return UBRING_ERR; + } + _trx->timerFd = timerFd; + return UBRING_OK; +} + +RETURN_CODE UBRing::UbrAddTimer() { + if (UNLIKELY(UbrAddCloseTimer() != UBRING_OK)) { + LOG(ERROR) << "Ubr " << _trx->localShm.name << " add closed timer failed."; + return UBRING_ERR; + } + + if (UNLIKELY(UbrAddHBTimer() != UBRING_OK)) { + DeleteTimerSafe((uint32_t)_trx->timerFd); + LOG(ERROR) << "Ubr " << _trx->localShm.name << " add heartbeat timer failed."; + return UBRING_ERR; + } + return UBRING_OK; +} + +void* UBRing::UbrTrxCloseCallback(void* args) { + auto* trx = (UbrTrx*) args; + if (UNLIKELY(UBRing::UbrTrxCallbackCheck(trx) != UBRING_OK)) { + return nullptr; + } + + auto* localRxEventQ = (UbrEventQMsg *)trx->ubrRx.localRxEventQ.addr; + auto* localTxEventQ = (UbrEventQMsg *)trx->ubrTx.localTxEventQ.addr; + if (localRxEventQ->flag != UBR_STATE_CLOSED || localTxEventQ->flag == UBR_STATE_CLOSED) { + return nullptr; + } + trx->ubrRx.trxState = UBR_STATE_CLOSED; + int fd = (int)trx->localShm.fd; + do { + if (ATOMIC_LOAD(trx->closeCnt) == 0) { + break; + } + ATOMIC_SUB(trx->closeCnt, 1); + + uint64_t startTime = GetCurNanoSeconds(); + + if (localTxEventQ->flag == UBR_STATE_CONNECTED || ATOMIC_LOAD(trx->closeCnt) == 1) { + localTxEventQ->flag = UBR_STATE_CLOSED; + trx->ubrTx.trxState = UBR_STATE_CLOSED; + } + UbrEventQMsg* remoteRxEventQ = (UbrEventQMsg *)trx->ubrTx.remoteRxEventQ.addr; + if (remoteRxEventQ == nullptr) { + LOG(ERROR) << "Trx close callback failed, " << trx->localShm.name << " remoteRxEventQ is NULL."; + break; + } + remoteRxEventQ->flag = UBR_STATE_CLOSED; + RETURN_CODE clearRc = ClearTrxResource(trx, startTime, UBR_CALL_BACK_CLOSE, 1); + if (UNLIKELY(clearRc != UBRING_OK && clearRc != UBRING_REENTRY)) { + LOG(ERROR) << "Trx close callback failed, " << trx->localShm.name << " clear trx resource failed."; + break; + } + } while (0); + return nullptr; +} + +RETURN_CODE UBRing::UbrAddHBTimer() { + if (UNLIKELY(_trx == NULL)) { + LOG(ERROR) << "Trx add heartbeat timer failed, trx is null."; + return UBRING_ERR; + } + + itimerspec timeSpec = { + .it_interval = {.tv_sec = FLAGS_ub_hb_timer_interval, .tv_nsec = 0}, + .it_value = {.tv_sec = 0, .tv_nsec = 1} + }; + int timerFd = TimerStart(&timeSpec, UbrTrxHBCallback, (void*)_trx); + if (UNLIKELY(timerFd == -1)) { + LOG(ERROR) << "Start ubr heartbeat timer failed."; + return UBRING_ERR; + } + _trx->hbTimerFd = timerFd; + return UBRING_OK; +} + +RETURN_CODE UBRing::UbrPassiveClearTrx(UbrTrx *trx, int fd, PASSIVE_DISC_TYPE type) { + RETURN_CODE passiveCloseCheckRc = UbrTrxCloseCheck(trx); + if (UNLIKELY(passiveCloseCheckRc != UBRING_OK)) { + if (passiveCloseCheckRc == UBRING_REENTRY) { + LOG(INFO) << "Passive close skipped, active close in progress, name=" << trx->localShm.name; + uint64_t startTime = GetCurNanoSeconds(); + return ClearTrxResource(trx, startTime, UBR_CALL_BACK_CLOSE); + } + return UBRING_ERR; + } + trx->ubrTx.trxState = UBR_STATE_CLOSED; + trx->ubrRx.trxState = UBR_STATE_CLOSED; + DeleteTimerSafe((uint32_t)trx->timerFd); + const char *typeName = NULL; + if (type == UBR_HEARTBEAT) { + DeleteTimer((uint32_t)trx->hbTimerFd); + typeName = "Trx heartbeat"; + } else if (type == UBR_UB_EVENT) { + DeleteTimerSafe((uint32_t)trx->hbTimerFd); + typeName = "Ub event callback"; + } + bthread_usleep(FLAGS_ub_flying_io_timeout * 1000000LL); // yield-friendly sleep + + int rc = ShmLocalFree(&trx->remoteShm); + if (rc != UBRING_OK) { + LOG(ERROR) << typeName << ", delete remote shm failed. ret=" << rc; + } + rc = ShmLocalFree(&trx->localShm); + if (rc != UBRING_OK) { + LOG(ERROR) << typeName << ", delete local shm failed. ret=" << rc; + } + + UBRingManager::ReleaseUbrTrxFromMgr(trx); + return UBRING_OK; +} + +void* UBRing::UbrTrxHBCallback(void* args) { + auto* trx = (UbrTrx*) args; + if (UNLIKELY(UbrTrxCallbackCheck(trx) != UBRING_OK)) { + return NULL; + } + + auto* localDataStatus = (UbrDataStatusQMsg *)trx->ubrTx.localDataStatusQ.addr; + auto* remoteDataStatus = (UbrDataStatusQMsg *)trx->ubrRx.remoteDataStatusQ.addr; + if (UNLIKELY(localDataStatus == NULL || remoteDataStatus == NULL)) { + LOG(ERROR) << "Heartbeat error, datastatus is NULL."; + return NULL; + } + + if (trx->ubrTx.trxState != UBR_STATE_CONNECTED || trx->ubrRx.trxState != UBR_STATE_CONNECTED) { + LOG_EVERY_SECOND(INFO) << "Heartbeat cannot be started, wait connected state."; + return NULL; + } + + remoteDataStatus->heartBeat = 1; + if (localDataStatus->heartBeat == 1) { + localDataStatus->heartBeat = 0; + trx->ubrTx.hbRetryCnt = 0; + return NULL; + } + + ++trx->ubrTx.hbRetryCnt; + if (trx->ubrTx.hbRetryCnt <= FLAGS_ub_hb_retry_cnt) { + return NULL; + } + + int fd = (int)trx->localShm.fd; + LOG(INFO) << "Hlc heartbeat, start to clear trx resource. hbTimerFd=" << fd << ", shmName=" << trx->localShm.name; + UbrPassiveClearTrx(trx, fd, UBR_HEARTBEAT); + LOG(INFO) << "Hlc heartbeat clear trx resource finish."; + return NULL; +} + +RETURN_CODE UBRing::UbrAddAsynClearTimer(UbrTrx *trx) { + if (UNLIKELY(trx == NULL)) { + LOG(ERROR) << "Trx add close timer failed, trx is null."; + return UBRING_ERR; + } + + if (trx->clearTimerFd > 0) { + return UBRING_OK; + } + + itimerspec timeSpec = { + .it_interval = {.tv_sec = 0, .tv_nsec = 0}, + .it_value = {.tv_sec = FLAGS_ub_flying_io_timeout, .tv_nsec = 0} + }; + + int timerFd = TimerStart(&timeSpec, UbrAsynClearCallback, (void*)trx); + if (UNLIKELY(timerFd == -1)) { + LOG(ERROR) << "Start ubr close timer failed, trx name=%s.", trx->localShm.name; + return UBRING_ERR; + } + trx->clearTimerFd = timerFd; + return UBRING_OK; +} + +void *UBRing::UbrAsynClearCallback(void *args) +{ + auto* trx = (UbrTrx*) args; + if (UNLIKELY(trx == NULL)) { + LOG(ERROR) << "Trx close, trx is null."; + return NULL; + } + + if (UNLIKELY(UbrTrxFreeShm(trx) != UBRING_OK)) { + LOG(ERROR) << "Trx close, wait for local shm " << trx->localShm.name << " free fail."; + } + + if (UNLIKELY(UBRingManager::ReleaseUbrTrxFromMgr(trx) != UBRING_OK)) { + LOG(ERROR) << "Trx close, release shm " << trx->localShm.name << " trx failed."; + } + return NULL; +} + +int UBRing::UbrTrxSend(const void *buf, uint32_t bufLen) +{ + if (UNLIKELY(CheckTrxSendPreCheck(_trx) != UBRING_OK)) { + return UBRING_ERR; + } + // 1.2 计算空间 + auto *dataStatusMsg = (UbrDataStatusQMsg *)_trx->ubrTx.localDataStatusQ.addr; + auto *dataMsg = (UbrMsgFormat *)_trx->ubrTx.remoteDataQ.addr; + uint32_t cap = _trx->ubrTx.capacity; + uint32_t tail = dataStatusMsg->tail; + uint32_t remainChunkNum = + (_trx->ubrTx.writePos > tail) ? (tail + cap - _trx->ubrTx.writePos) : (tail - _trx->ubrTx.writePos); + uint32_t needMsgChunkNum = CalcUbrMsgChunkCnt(bufLen); + if (needMsgChunkNum >= cap) { + LOG(ERROR) << "Ubr send failed, payload length=" << bufLen + << " needs " << needMsgChunkNum << " chunks, capacity=" << cap << "."; + errno = EMSGSIZE; + return UBRING_ERR; + } + if (remainChunkNum < needMsgChunkNum) { + return UBRING_RETRY; + } + UbrMsgFormat *msg = &(_trx->ubrTx.localMsgSpace); + uint32_t totalSendLen = 0; + uint32_t remainBufLen = bufLen; + uint8_t isLastPkt = 0; + _trx->ubrTx.outIoId++; + ((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->ioId = _trx->ubrTx.outIoId; + while (remainBufLen > 0) { + isLastPkt = (uint8_t)(remainBufLen <= UBR_MSG_PAYLOAD_LEN); + msg->header[UBR_MSG_FLAG_INDEX] = isLastPkt ? UBR_MSG_CHUNK_EOF : UBR_MSG_CHUNK_EXIST; + msg->header[UBR_MSG_LEN_INDEX] = isLastPkt ? (uint8_t)remainBufLen : UBR_MSG_PAYLOAD_LEN; + msg->header[UBR_MSG_CUR_INDEX] = 0; + memcpy(msg->payload.inner, (const uint8_t *)buf + totalSendLen, msg->header[UBR_MSG_LEN_INDEX]); + Copy64Byte((int8_t *)&dataMsg[_trx->ubrTx.writePos], (int8_t *)msg); + _trx->ubrTx.writePos = (_trx->ubrTx.writePos + 1) % cap; + totalSendLen += msg->header[UBR_MSG_LEN_INDEX]; + remainBufLen -= msg->header[UBR_MSG_LEN_INDEX]; + } + return (int)totalSendLen; +} + +int UBRing::UbrTrxRecv(void *buf, uint32_t bufLen) +{ + RETURN_CODE rc = UBRING_OK; + if (UNLIKELY((rc = CheckTrxRecvParam(_trx, buf, bufLen)) != UBRING_OK)) { + return (rc == UBR_NOT_CONNECTED) ? 0 : rc; + } + UbrMsgFormat *dataMsg = (UbrMsgFormat *)_trx->ubrRx.localDataQ.addr; + uint32_t readPosEnd = _trx->ubrRx.readPos; + uint8_t flag = dataMsg[readPosEnd].header[UBR_MSG_FLAG_INDEX]; + if (flag == UBR_MSG_CHUNK_NONE) { + return UBRING_RETRY; + } + return UbrTrxRecvBlockMode(static_cast<uint8_t *>(buf), bufLen); +} + +int UBRing::UbrTrxRecvBlockMode(uint8_t *dest, uint32_t bufLen) +{ + RETURN_CODE rc = UBRING_OK; + if (UNLIKELY((rc = CheckTrxRecvParam(_trx, dest, bufLen)) != UBRING_OK)) { + return (rc == UBR_NOT_CONNECTED) ? 0 : rc; + } + + int32_t totalCopied = 0; + int32_t remainingLen = (int32_t)bufLen; + bool notEofEncountered = true; + + UbrRx *ubrRx = &_trx->ubrRx; + UbrMsgFormat *dataMsg = (UbrMsgFormat *)ubrRx->localDataQ.addr; + bool needUpdateEpollEofPos = ubrRx->readPos == ubrRx->epEofPos; + + while (notEofEncountered && remainingLen > 0) { + if (UNLIKELY(CheckTrxRecvPreCheck(_trx) != UBRING_OK)) { + return UBRING_ERR; + } + UbrMsgFormat *currentChunk = &dataMsg[ubrRx->readPos]; + uint8_t flag = currentChunk->header[UBR_MSG_FLAG_INDEX]; + if (flag == UBR_MSG_CHUNK_NONE) { + if (totalCopied > 0) { + break; + } + errno = EAGAIN; + return -1; + } + if (flag == UBR_MSG_CHUNK_EOF) { + notEofEncountered = false; + } + uint8_t chunkMsgLen = currentChunk->header[UBR_MSG_LEN_INDEX]; + uint8_t curIndex = currentChunk->header[UBR_MSG_CUR_INDEX]; + uint8_t availableData = chunkMsgLen - curIndex; + + int32_t copyLen = (remainingLen < availableData) ? remainingLen : availableData; + memcpy(dest + totalCopied, dataMsg[ubrRx->readPos].payload.inner + curIndex, (size_t)copyLen); + totalCopied += copyLen; + remainingLen -= copyLen; + currentChunk->header[UBR_MSG_CUR_INDEX] += (uint8_t)copyLen; + if (LIKELY(currentChunk->header[UBR_MSG_CUR_INDEX] == chunkMsgLen)) { + currentChunk->header[UBR_MSG_FLAG_INDEX] = UBR_MSG_CHUNK_NONE; + UpdateDataQTail(_trx); + ubrRx->readPos = (ubrRx->readPos + 1) % ubrRx->capacity; + } + } + if (needUpdateEpollEofPos) { + ubrRx->epEofPos = ubrRx->readPos; + } + return (int)totalCopied; +} + +ssize_t UBRing::UbrTrxWritev(const struct iovec *iov, int iovcnt) +{ + if (UNLIKELY(CheckTrxSendPreCheck(_trx) != UBRING_OK)) { + return UBRING_ERR; + } + + size_t bufLen = 0; + for (int i = 0; i < iovcnt; i++) { + bufLen += iov[i].iov_len; + } + RETURN_CODE rc = WritevHasEnoughSpace(bufLen); + if (rc != UBRING_OK) { + return rc; + } + + UbrMsgFormat *dataMsg = (UbrMsgFormat *)_trx->ubrTx.remoteDataQ.addr; + UbrMsgFormat *msg = &(_trx->ubrTx.localMsgSpace); + int curIov = 0; + size_t curIovPos = 0; + ssize_t totalSendLen = 0; + size_t pktRemainN = 0; + size_t iovRemain = 0; + size_t fulled = 0; + uint8_t isLastPkt = 0; + uint8_t curPktLen = 0; + _trx->ubrTx.outIoId++; + ((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->ioId = _trx->ubrTx.outIoId; + while (bufLen > 0) { + isLastPkt = (uint8_t)(bufLen <= UBR_MSG_PAYLOAD_LEN); + curPktLen = isLastPkt ? (uint8_t)bufLen : UBR_MSG_PAYLOAD_LEN; + msg->header[UBR_MSG_FLAG_INDEX] = isLastPkt ? UBR_MSG_CHUNK_EOF : UBR_MSG_CHUNK_EXIST; + msg->header[UBR_MSG_LEN_INDEX] = curPktLen; + msg->header[UBR_MSG_CUR_INDEX] = 0; + pktRemainN = curPktLen; + while (curIov < iovcnt && pktRemainN > 0) { + iovRemain = (iov[curIov].iov_len - curIovPos); + fulled = iovRemain > pktRemainN ? pktRemainN : iovRemain; + memcpy((msg->payload.inner + (curPktLen - (uint8_t)pktRemainN)), + (uint8_t *)(iov[curIov].iov_base) + curIovPos, + fulled); + pktRemainN -= fulled; + curIovPos += fulled; + if (curIovPos == iov[curIov].iov_len) { + curIov++; + curIovPos = 0; + } + } + + Copy64Byte((int8_t *)&dataMsg[_trx->ubrTx.writePos], (int8_t *)msg); + _trx->ubrTx.writePos = (_trx->ubrTx.writePos + 1) % _trx->ubrTx.capacity; + totalSendLen += (ssize_t)curPktLen; + bufLen -= (int)curPktLen; + } + return totalSendLen; +} + +ssize_t UBRing::UbrTrxReadv(const struct iovec *iov, int iovcnt) +{ + RETURN_CODE rc = UBRING_OK; + if (UNLIKELY((rc = CheckTrxRecvParam(_trx, iov, (uint32_t)iovcnt)) != UBRING_OK)) { + return (rc == UBR_NOT_CONNECTED) ? 0 : rc; + } + UbrMsgFormat *dataMsg = (UbrMsgFormat *)_trx->ubrRx.localDataQ.addr; + uint32_t readPosEnd = _trx->ubrRx.readPos; + uint8_t flag = dataMsg[readPosEnd].header[UBR_MSG_FLAG_INDEX]; + if (flag == UBR_MSG_CHUNK_NONE) { + errno = EAGAIN; + return -1; + } + ssize_t nr = UbrTrxReadvBlockMode(iov, iovcnt); + if (UNLIKELY(nr == -1)) { + LOG(ERROR) << "Non-blocking readv msg in failed, connection has been closed."; + errno = EPIPE; + return -1; + } + return nr; +} + +ssize_t UBRing::UbrTrxReadvBlockMode(const struct iovec *iov, int iovcnt) +{ + RETURN_CODE rc = UBRING_OK; + if (UNLIKELY((rc = CheckTrxRecvParam(_trx, iov, (uint32_t)iovcnt)) != UBRING_OK)) { + return (rc == UBR_NOT_CONNECTED) ? 0 : rc; + } + + size_t remainBufLen = 0; + for (int i = 0; i < iovcnt; i++) { + remainBufLen += iov[i].iov_len; + } + + bool needUpdateEpollEofPos = _trx->ubrRx.readPos == _trx->ubrRx.epEofPos; + ssize_t totalRecvLen = StartReadv(_trx, iov, iovcnt, remainBufLen); + + if (needUpdateEpollEofPos) { + _trx->ubrRx.epEofPos = _trx->ubrRx.readPos; + } + return totalRecvLen; +} + +RETURN_CODE UBRing::IsUbrTrxReadable(uint32_t epEvent) +{ + if (UNLIKELY(_trx == NULL)) { + LOG(ERROR) << "The trx to be checked is NULL."; + return UBRING_ERR; + } + if (UNLIKELY(_trx->localShm.addr == NULL)) { + LOG(ERROR) << "The trx localShm to be checked is NULL."; + return UBRING_ERR; + } + if (UNLIKELY(_trx->ubrTx.trxState != UBR_STATE_CONNECTED)) { + // TODO mwj 这几块的日志是否需要删除 + // LOG(ERROR) << "The trx is not connected state."; Review Comment: Unnecessary comments, please delete. ########## src/brpc/ubshm/ub_endpoint.cpp: ########## @@ -0,0 +1,943 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#if BRPC_WITH_UBRING + +#include <errno.h> + +#include <gflags/gflags.h> +#include <array> +#include "butil/fd_utility.h" +#include "butil/logging.h" // CHECK, LOG +#include "butil/sys_byteorder.h" // HostToNet,NetToHost +#include "bthread/bthread.h" +#include "brpc/errno.pb.h" +#include "brpc/event_dispatcher.h" +#include "brpc/input_messenger.h" +#include "brpc/socket.h" +#include "brpc/reloadable_flags.h" +#include "brpc/ubshm/ub_helper.h" +#include "brpc/ubshm/ub_endpoint.h" +#include "brpc/ubshm/shm/shm_def.h" +#include "brpc/ubshm/common/common.h" +#include "brpc/ubshm_transport.h" +#include "brpc/ubshm/ubr_trx.h" + +DECLARE_int32(task_group_ntags); + +namespace brpc { +DECLARE_bool(log_connection_close); +namespace ubring { + +extern bool g_skip_ub_init; +DEFINE_int32(data_queue_size, 4, "data queue size for UB"); +DEFINE_bool(ub_trace_verbose, false, "Print log message verbosely"); +BRPC_VALIDATE_GFLAG(ub_trace_verbose, brpc::PassValidate); +DEFINE_int32(ub_poller_num, 1, "Poller number in ub polling mode."); +DEFINE_bool(ub_poller_yield, false, "Yield thread in RDMA polling mode."); +DEFINE_bool(ub_edisp_unsched, false, "Disable event dispatcher schedule"); +DEFINE_bool(ub_disable_bthread, false, "Disable bthread in RDMA"); + +static const size_t MIN_ONCE_READ = 4096; +static const size_t MAX_ONCE_READ = 524288; +static const size_t IOBUF_IOV_MAX = 256; + +static const char* MAGIC_STR = "UB"; +static const size_t MAGIC_STR_LEN = 2; +static const size_t HELLO_MSG_LEN_MIN = 64; +static const size_t ACK_MSG_LEN = 4; +static uint16_t g_ub_hello_msg_len = 64; +static uint16_t g_ub_hello_version = 2; +static uint16_t g_ub_impl_version = 1; + +static const uint32_t ACK_MSG_UB_OK = 0x1; + +static butil::Mutex* g_ubring_resource_mutex = NULL; + +struct HelloMessage { + void Serialize(void* data) const; + void Deserialize(void* data); + std::string toString() const; + + uint16_t msg_len; + uint16_t hello_ver; + uint16_t impl_ver; + uint64_t len; + char shm_name[SHM_MAX_NAME_BUFF_LEN]; +}; + +void HelloMessage::Serialize(void* data) const { + char* current_pos = static_cast<char*>(data); + const uint16_t net_msg_len = butil::HostToNet16(msg_len); + memcpy(current_pos, &net_msg_len, sizeof(net_msg_len)); + current_pos += sizeof(net_msg_len); + const uint16_t net_hello_ver = butil::HostToNet16(hello_ver); + memcpy(current_pos, &net_hello_ver, sizeof(net_hello_ver)); + current_pos += sizeof(net_hello_ver); + const uint16_t net_impl_ver = butil::HostToNet16(impl_ver); + memcpy(current_pos, &net_impl_ver, sizeof(net_impl_ver)); + current_pos += sizeof(net_impl_ver); + const uint64_t net_len = butil::HostToNet64(len); + memcpy(current_pos, &net_len, sizeof(net_len)); + current_pos += sizeof(net_len); + memcpy(current_pos, shm_name, SHM_MAX_NAME_BUFF_LEN); +} + +void HelloMessage::Deserialize(void* data) { + char* current_pos = static_cast<char*>(data); + uint16_t net_msg_len; + memcpy(&net_msg_len, current_pos, sizeof(net_msg_len)); + msg_len = butil::NetToHost16(net_msg_len); + current_pos += sizeof(net_msg_len); + uint16_t net_hello_ver; + memcpy(&net_hello_ver, current_pos, sizeof(net_hello_ver)); + hello_ver = butil::NetToHost16(net_hello_ver); + current_pos += sizeof(net_hello_ver); + uint16_t net_impl_ver; + memcpy(&net_impl_ver, current_pos, sizeof(net_impl_ver)); + impl_ver = butil::NetToHost16(net_impl_ver); + current_pos += sizeof(net_impl_ver); + uint64_t net_len; + memcpy(&net_len, current_pos, sizeof(net_len)); + len = butil::NetToHost64(net_len); + current_pos += sizeof(net_len); + memcpy(shm_name, current_pos, SHM_MAX_NAME_BUFF_LEN); +} + +std::string HelloMessage::toString() const { + constexpr size_t MAX_LEN = 16 + 6 + 16 + 6 + 16 + 6 + 20 + 6 + SHM_MAX_NAME_BUFF_LEN + 32; + std::array<char, MAX_LEN> buf; + int n = snprintf(buf.data(), buf.size(), + "msg_len=%u, hello_ver=%u, impl_ver=%u, len=%lu, shm_name=%.*s", + msg_len, + hello_ver, + impl_ver, + static_cast<unsigned long>(len), // 兼容32/64位 + static_cast<int>(SHM_MAX_NAME_BUFF_LEN), // 限制最大输出长度 + shm_name + ); + return std::string(buf.data(), static_cast<size_t>(n)); +} + +UBShmEndpoint::UBShmEndpoint(Socket* s) + : _socket(s) + , _state(UNINIT) + , _ub_ring(nullptr) + , _cq_sid(INVALID_SOCKET_ID) +{ + _read_butex = bthread::butex_create_checked<butil::atomic<int>>(); +} + +UBShmEndpoint::~UBShmEndpoint() { + Reset(); + bthread::butex_destroy(_read_butex); +} + +void UBShmEndpoint::Reset() { + DeallocateResources(); + + delete _ub_ring; + _ub_ring = nullptr; + _cq_sid = INVALID_SOCKET_ID; + _state = UNINIT; +} + +void UBConnect::StartConnect(const Socket* socket, + void (*done)(int err, void* data), + void* data) { + auto* ub_transport = static_cast<UBShmTransport*>(socket->_transport.get()); + CHECK(ub_transport->_ub_ep != NULL); + SocketUniquePtr s; + if (Socket::Address(socket->id(), &s) != 0) { + return; + } + if (!IsUBAvailable()) { + ub_transport->_ub_ep->_state = UBShmEndpoint::FALLBACK_TCP; + ub_transport->_ub_state = UBShmTransport::UB_OFF; + done(0, data); + return; + } + _done = done; + _data = data; + bthread_t tid; + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + bthread_attr_set_name(&attr, "UBProcessHandshakeAtClient"); + if (bthread_start_background(&tid, &attr, + UBShmEndpoint::ProcessHandshakeAtClient, ub_transport->_ub_ep) < 0) { + LOG(FATAL) << "Fail to start handshake bthread"; + Run(); + } else { + s.release(); + } +} + +void UBConnect::StopConnect(Socket* socket) { } + +void UBConnect::Run() { + _done(errno, _data); +} + +static void TryReadOnTcpDuringRdmaEst(Socket* s) { + int progress = Socket::PROGRESS_INIT; + while (true) { + uint8_t tmp; + ssize_t nr = read(s->fd(), &tmp, 1); + if (nr < 0) { + if (errno != EAGAIN) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to read from " << s; + s->SetFailed(saved_errno, "Fail to read from %s: %s", + s->description().c_str(), berror(saved_errno)); + return; + } + if (!s->MoreReadEvents(&progress)) { + break; + } + } else if (nr == 0) { + s->SetEOF(); + return; + } else { + LOG(WARNING) << "Read unexpected data from " << s; + s->SetFailed(EPROTO, "Read unexpected data from %s", + s->description().c_str()); + return; + } + } +} + +void UBShmEndpoint::OnNewDataFromTcp(Socket* m) { + auto* ub_transport = static_cast<UBShmTransport*>(m->_transport.get()); + UBShmEndpoint* ep = ub_transport->GetUBShmEp(); + CHECK(ep != NULL); + + int progress = Socket::PROGRESS_INIT; + while (true) { + if (ep->_state == UNINIT) { + if (!m->CreatedByConnect()) { + if (!IsUBAvailable()) { + ep->_state = FALLBACK_TCP; + ub_transport->_ub_state = UBShmTransport::UB_OFF; + continue; + } + bthread_t tid; + ep->_state = S_HELLO_WAIT; + SocketUniquePtr s; + m->ReAddress(&s); + bthread_attr_t attr = BTHREAD_ATTR_NORMAL; + bthread_attr_set_name(&attr, "UBProcessHandshakeAtServer"); + if (bthread_start_background(&tid, &attr, + ProcessHandshakeAtServer, ep) < 0) { + ep->_state = UNINIT; + LOG(FATAL) << "Fail to start handshake bthread"; + } else { + s.release(); + } + } else { + // The connection may be closed or reset before the client + // starts handshake. This will be handled by client handshake. + // Ignore the exception here. + } + } else if (ep->_state < ESTABLISHED) { // during handshake + ep->_read_butex->fetch_add(1, butil::memory_order_release); + bthread::butex_wake(ep->_read_butex); + } else if (ep->_state == FALLBACK_TCP){ // handshake finishes + InputMessenger::OnNewMessages(m); + return; + } else if (ep->_state == ESTABLISHED) { + TryReadOnTcpDuringRdmaEst(ep->_socket); + return; + } + if (!m->MoreReadEvents(&progress)) { + break; + } + } +} +bool HelloNegotiationValid(HelloMessage& msg) { + if (msg.hello_ver == g_ub_hello_version && + msg.impl_ver == g_ub_impl_version) { + // This can be modified for future compatibility + return true; + } + return false; +} + +static const int WAIT_TIMEOUT_MS = 50; + +int UBShmEndpoint::ReadFromFd(void* data, size_t len) { + CHECK(data != NULL); + int nr = 0; + size_t received = 0; + do { + const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS); + nr = read(_socket->fd(), (uint8_t*)data + received, len - received); + if (nr < 0) { + if (errno == EAGAIN) { + const int expected_val = _read_butex->load(butil::memory_order_acquire); + if (bthread::butex_wait(_read_butex, expected_val, &duetime) < 0) { + if (errno != EWOULDBLOCK && errno != ETIMEDOUT) { + return -1; + } + } + } else { + return -1; + } + } else if (nr == 0) { + errno = EEOF; + return -1; + } else { + received += nr; + } + } while (received < len); + return 0; +} + +int UBShmEndpoint::WriteToFd(void* data, size_t len) { + CHECK(data != NULL); + int nw = 0; + size_t written = 0; + do { + const timespec duetime = butil::milliseconds_from_now(WAIT_TIMEOUT_MS); + nw = write(_socket->fd(), (uint8_t*)data + written, len - written); + if (nw < 0) { + if (errno == EAGAIN) { + if (_socket->WaitEpollOut(_socket->fd(), true, &duetime) < 0) { + if (errno != ETIMEDOUT) { + return -1; + } + } + } else { + return -1; + } + } else { + written += nw; + } + } while (written < len); + return 0; +} + +inline void UBShmEndpoint::TryReadOnTcp() { + if (_socket->_nevent.fetch_add(1, butil::memory_order_acq_rel) == 0) { + if (_state == FALLBACK_TCP) { + InputMessenger::OnNewMessages(_socket); + } else if (_state == ESTABLISHED) { + TryReadOnTcpDuringRdmaEst(_socket); + } + } +} + +void* UBShmEndpoint::ProcessHandshakeAtClient(void* arg) { + UBShmEndpoint* ep = static_cast<UBShmEndpoint*>(arg); + SocketUniquePtr s(ep->_socket); + UBConnect::RunGuard rg((UBConnect*)s->_app_connect.get()); + + LOG_IF(INFO, FLAGS_ub_trace_verbose) + << "Start handshake on " << s->_local_side; + + uint8_t data[g_ub_hello_msg_len]; + + ep->_state = C_ALLOC_SHM; + auto* ub_transport = static_cast<UBShmTransport*>(s->_transport.get()); + size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE; + SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint32_t)s->fd()}; + auto shm_name_str = butil::endpoint2str(s->local_side()); + const char* shm_name = shm_name_str.c_str(); + if (ep->AllocateClientResources(&local_trx_shm, shm_name) < 0) { + LOG(WARNING) << "Fallback to tcp:" << s->description(); + ub_transport->_ub_state = UBShmTransport::UB_OFF; + ep->_state = FALLBACK_TCP; + return NULL; + } + + ep->_state = C_HELLO_SEND; + HelloMessage local_msg; + local_msg.msg_len = g_ub_hello_msg_len; + local_msg.hello_ver = g_ub_hello_version; + local_msg.impl_ver = g_ub_impl_version; + local_msg.len = local_shm_len; + memcpy(local_msg.shm_name, local_trx_shm.name, SHM_MAX_NAME_BUFF_LEN); + memcpy(data, MAGIC_STR, MAGIC_STR_LEN); + local_msg.Serialize((char*)data + MAGIC_STR_LEN); + if (ep->WriteToFd(data, g_ub_hello_msg_len) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to send hello message to server:" << s->description(); + s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + LOG_IF(INFO, FLAGS_ub_trace_verbose) << "client handshake message : " << local_msg.toString(); + + ep->_state = C_HELLO_WAIT; + if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to get hello message from server:" << s->description(); + s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) { + LOG(WARNING) << "Read unexpected data during handshake:" << s->description(); + s->SetFailed(EPROTO, "Fail to complete ubring handshake from %s: %s", + s->description().c_str(), berror(EPROTO)); + ep->_state = FAILED; + return NULL; + } + + if (ep->ReadFromFd(data, HELLO_MSG_LEN_MIN - MAGIC_STR_LEN) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to get Hello Message from server:" << s->description(); + s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + HelloMessage remote_msg; + remote_msg.Deserialize(data); + if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) { + LOG(WARNING) << "Fail to parse Hello Message length from server:" + << s->description(); + s->SetFailed(EPROTO, "Fail to complete ubring handshake from %s: %s", + s->description().c_str(), berror(EPROTO)); + ep->_state = FAILED; + return NULL; + } + + if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) { + // TODO: Read Hello Message customized data + // Just for future use, should not happen now + } + + if (!HelloNegotiationValid(remote_msg)) { + LOG(WARNING) << "Fail to negotiate with server, fallback to tcp:" + << s->description(); + ub_transport->_ub_state = UBShmTransport::UB_OFF; + } else { + ep->_state = C_MAP_REMOTE_SHM; + if (ep->_ub_ring->UbrMapRemoteShm(&local_trx_shm, shm_name) < 0) { + LOG(WARNING) << "Fail to map the remote shm, fallback to tcp:" << s->description(); + ub_transport->_ub_state = UBShmTransport::UB_OFF; + } else { + ub_transport->_ub_state = UBShmTransport::UB_ON; + } + } + + ep->_state = C_ACK_SEND; + uint32_t flags = 0; + if (ub_transport->_ub_state != UBShmTransport::UB_OFF) { + flags |= ACK_MSG_UB_OK; + } + uint32_t* tmp = (uint32_t*)data; + *tmp = butil::HostToNet32(flags); + if (ep->WriteToFd(data, ACK_MSG_LEN) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to send Ack Message to server:" << s->description(); + s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + + if (ub_transport->_ub_state == UBShmTransport::UB_ON) { + ep->_state = ESTABLISHED; + LOG_IF(INFO, FLAGS_ub_trace_verbose) + << "Client handshake ends (use ubring) on " << s->description(); + } else { + ep->_state = FALLBACK_TCP; + LOG_IF(INFO, FLAGS_ub_trace_verbose) + << "Client handshake ends (use tcp) on " << s->description(); + } + + errno = 0; + + return NULL; +} + +void* UBShmEndpoint::ProcessHandshakeAtServer(void* arg) { + UBShmEndpoint* ep = static_cast<UBShmEndpoint*>(arg); + SocketUniquePtr s(ep->_socket); + + LOG_IF(INFO, FLAGS_ub_trace_verbose) + << "Start handshake on " << s->description(); + + uint8_t data[g_ub_hello_msg_len]; + + ep->_state = S_HELLO_WAIT; + if (ep->ReadFromFd(data, MAGIC_STR_LEN) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to read Hello Message from client:" << s->description() << " " << s->_remote_side; + s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + auto* ub_transport = static_cast<UBShmTransport*>(s->_transport.get()); + if (memcmp(data, MAGIC_STR, MAGIC_STR_LEN) != 0) { + LOG_IF(INFO, FLAGS_ub_trace_verbose) << "It seems that the " + << "client does not use RDMA, fallback to TCP:" + << s->description(); + s->_read_buf.append(data, MAGIC_STR_LEN); + ep->_state = FALLBACK_TCP; + ub_transport->_ub_state = UBShmTransport::UB_OFF; + ep->TryReadOnTcp(); + return NULL; + } + + if (ep->ReadFromFd(data, g_ub_hello_msg_len - MAGIC_STR_LEN) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to read Hello Message from client:" << s->description(); + s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + + HelloMessage remote_msg; + remote_msg.Deserialize(data); + LOG_IF(INFO, FLAGS_ub_trace_verbose) << "server receive handshake message : " << remote_msg.toString(); + if (remote_msg.msg_len < HELLO_MSG_LEN_MIN) { + LOG(WARNING) << "Fail to parse Hello Message length from client:" + << s->description(); + s->SetFailed(EPROTO, "Fail to complete ubring handshake from %s: %s", + s->description().c_str(), berror(EPROTO)); + ep->_state = FAILED; + return NULL; + } + if (remote_msg.msg_len > HELLO_MSG_LEN_MIN) { + // TODO: Read Hello Message customized header + // Just for future use, should not happen now + } + + if (!HelloNegotiationValid(remote_msg)) { + LOG(WARNING) << "Fail to negotiate with client, fallback to tcp:" + << s->description(); + ub_transport->_ub_state = UBShmTransport::UB_OFF; + } else { + ep->_state = S_ALLOC_SHM; + ubring::SHM remote_trx_shm = {NULL, remote_msg.len, 0, {0}, (uint32_t)ep->_socket->fd()}; + strncpy(remote_trx_shm.name, remote_msg.shm_name, SHM_MAX_NAME_BUFF_LEN); + + size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE; + // server端共享内存名称 + ubring::SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint32_t)ep->_socket->fd()}; + char clientName[SHM_MAX_NAME_BUFF_LEN]; + strncpy(clientName, remote_msg.shm_name, SHM_MAX_NAME_BUFF_LEN); + + char *clientIpPort = strrchr(clientName, '_'); + if (clientIpPort != NULL) { + *clientIpPort = '\0'; + } + int result = snprintf(local_trx_shm.name, SHM_MAX_NAME_BUFF_LEN, "%s_%s", + clientName, SERVER_SHM_NAME_SUFFIX); + if (UNLIKELY(result < 0)) { + LOG(WARNING) << "Copy client shared memory name failed, ret=" << result; + ub_transport->_ub_state = UBShmTransport::UB_OFF; + } + if (result >= 0 && ep->AllocateServerResources(&remote_trx_shm, &local_trx_shm) < 0) { + LOG(WARNING) << "Fail to allocate ub resources, fallback to tcp:" + << s->description(); + ub_transport->_ub_state = UBShmTransport::UB_OFF; + } + } + + ep->_state = S_HELLO_SEND; + HelloMessage local_msg; + local_msg.msg_len = g_ub_hello_msg_len; + if (ub_transport->_ub_state == UBShmTransport::UB_OFF) { + local_msg.impl_ver = 0; + local_msg.hello_ver = 0; + } else { + local_msg.hello_ver = g_ub_hello_version; + local_msg.impl_ver = g_ub_impl_version; + local_msg.len = (FLAGS_data_queue_size) * MB_TO_BYTE; + memcpy(local_msg.shm_name, remote_msg.shm_name, SHM_MAX_NAME_BUFF_LEN); + } + memcpy(data, MAGIC_STR, MAGIC_STR_LEN); + local_msg.Serialize((char*)data + MAGIC_STR_LEN); + if (ep->WriteToFd(data, g_ub_hello_msg_len) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to send Hello Message to client:" << s->description(); + s->SetFailed(saved_errno, "Fail to complete ub handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + + ep->_state = S_ACK_WAIT; + if (ep->ReadFromFd(data, ACK_MSG_LEN) < 0) { + const int saved_errno = errno; + PLOG(WARNING) << "Fail to read ack message from client:" << s->description(); + s->SetFailed(saved_errno, "Fail to complete ubring handshake from %s: %s", + s->description().c_str(), berror(saved_errno)); + ep->_state = FAILED; + return NULL; + } + + uint32_t* tmp = (uint32_t*)data; + uint32_t flags = butil::NetToHost32(*tmp); + if (flags & ACK_MSG_UB_OK) { + if (ub_transport->_ub_state == UBShmTransport::UB_OFF) { + LOG(WARNING) << "Fail to parse Hello Message length from client:" + << s->description(); + s->SetFailed(EPROTO, "Fail to complete ub handshake from %s: %s", + s->description().c_str(), berror(EPROTO)); + ep->_state = FAILED; + return NULL; + } else { + ub_transport->_ub_state = UBShmTransport::UB_ON; + ep->_state = ESTABLISHED; + LOG_IF(INFO, FLAGS_ub_trace_verbose) + << "Server handshake ends (use ubring) on " << s->description(); + } + } else { + ub_transport->_ub_state = UBShmTransport::UB_OFF; + ep->_state = FALLBACK_TCP; + LOG_IF(INFO, FLAGS_ub_trace_verbose) + << "Server handshake ends (use tcp) on " << s->description(); + } + ep->TryReadOnTcp(); + + return NULL; +} + +bool UBShmEndpoint::IsWritable() const { + if (BAIDU_UNLIKELY(g_skip_ub_init)) { + // Just for UT + return false; + } + auto ret = _ub_ring->IsUbrTrxWriteable(EPOLLET); + if (ret == 0) { + return true; + } + return false; +} + +ssize_t UBShmEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) { + if (BAIDU_UNLIKELY(g_skip_ub_init)) { + // Just for UT + errno = EAGAIN; + return -1; + } + if (BAIDU_UNLIKELY(ndata == 0)) { + return 0; + } + struct iovec vec[IOBUF_IOV_MAX]; + size_t nvec = 0; + for (size_t i = 0; i < ndata; ++i) { + const butil::IOBuf* p = from[i]; + const size_t nref = p->backing_block_num(); + for (size_t j = 0; j < nref && nvec < IOBUF_IOV_MAX; ++j, ++nvec) { + butil::StringPiece sp = p->backing_block(j); + vec[nvec].iov_base = const_cast<char*>(sp.data()); + vec[nvec].iov_len = sp.size(); + } + } + + ssize_t nw = 0; + errno = 0; + nw = _ub_ring->UbrTrxWritev(vec, nvec); + if (UNLIKELY(nw == -1)) { + if (errno == EMSGSIZE) { + LOG(ERROR) << "Non-blocking send msg failed, message is larger than ubring capacity."; + } else { + LOG(ERROR) << "Non-blocking send msg in failed, connection has been closed."; + errno = EPIPE; + } + } else if (UNLIKELY(nw == UBRING_RETRY)) { + errno = EAGAIN; + nw = -1; + } + if (nw <= 0) { + return nw; + } + size_t npop_all = nw; + for (size_t i = 0; i < ndata; ++i) { + npop_all -= from[i]->pop_front(npop_all); + if (npop_all == 0) { + break; + } + } + return nw; +} + +int UBShmEndpoint::AllocateClientResources(ubring::SHM* local_trx_shm, const char* shm_name) { + if (BAIDU_UNLIKELY(g_skip_ub_init)) { + // For UT + return 0; + } + + CHECK(_ub_ring == NULL); + // TODO: Pooling management + _ub_ring = new UBRing(); + + SocketOptions options; + options.user = this; + options.keytable_pool = _socket->_keytable_pool; + if (Socket::Create(options, &_cq_sid) < 0) { + PLOG(WARNING) << "Fail to create socket for cq"; + return -1; + } + int ret = _ub_ring->UbrAllocateLocalShm(local_trx_shm, shm_name); + if (ret != 0) { + return ret; + } + PollerRegisterEvent(CqSidOp::ADD, EPOLLIN); + return 0; +} + +int UBShmEndpoint::AllocateServerResources(ubring::SHM* remote_trx_shm, ubring::SHM* local_trx_shm) { + if (BAIDU_UNLIKELY(g_skip_ub_init)) { + // For UT + return 0; + } + + CHECK(_ub_ring == NULL); + // TODO: Pooling management + _ub_ring = new UBRing(); + + SocketOptions options; + options.user = this; + options.keytable_pool = _socket->_keytable_pool; + if (Socket::Create(options, &_cq_sid) < 0) { + PLOG(WARNING) << "Fail to create socket for cq"; + return -1; + } + int ret = _ub_ring->UbrAllocateServerShm(remote_trx_shm, local_trx_shm); + if (ret != 0) { + return ret; + } + // TODO mwj 是否应该在连接之后再进行轮询? Review Comment: Please use English. ########## src/brpc/ubshm/ub_ring.cpp: ########## @@ -0,0 +1,1091 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <errno.h> +#include <iostream> +#include <gflags/gflags.h> +#include <unistd.h> +#include <ctime> +#include "bthread/bthread.h" +#include "butil/logging.h" +#include "brpc/ubshm/ub_ring.h" +#include "brpc/ubshm/ub_ring_manager.h" +#include "brpc/ubshm/shm/shm_ipc.h" + +namespace brpc { +namespace ubring { +uint32_t g_sleepTime[UBR_TASK_STEP_NUM] = {0}; +#define TIME_COVERSION 1000 +DEFINE_int32(ub_disconnect_timeout, 5, "Ubshm disconnection timeout."); +DEFINE_int32(ub_connect_timeout, 1, "Ubshm connection timeout."); +DEFINE_int32(ub_hb_timer_interval, 5, "Heartbeat timer interval."); +DEFINE_int32(ub_hb_retry_cnt, 10, "Heartbeat retry times."); +DEFINE_int32(ub_event_queue_timer_interval, 100, "Interval of the disconnection timer."); + +UBRing::UBRing() +{} +UBRing::~UBRing() +{} + +RETURN_CODE UBRing::UbrTrxMapShm(SHM *localShm, SHM *remoteShm) +{ + RETURN_CODE rc = UbrTrxMapLocalShm(localShm); + if (UNLIKELY(rc != UBRING_OK)) { + LOG(ERROR) << "Trx map local shared memory failed."; + return rc; + } + rc = UbrTrxMapRemoteShm(remoteShm); + if (UNLIKELY(rc != UBRING_OK)) { + LOG(ERROR) << "Trx map remote shared memory failed."; + return rc; + } + return UBRING_OK; +} + +RETURN_CODE UBRing::UbrTrxClose() { + RETURN_CODE closeCheckRc = UbrTrxCloseCheck(_trx); + if (UNLIKELY(closeCheckRc != UBRING_OK)) { + if (closeCheckRc == UBRING_REENTRY) { + LOG(INFO) << "Trx close skipped, already closing, local name=" << _trx->localShm.name; + return UBRING_OK; + } + return UBRING_ERR; + } + if (_trx->ubrRx.remoteTxEventQ.addr != nullptr) { + ((UbrEventQMsg *)_trx->ubrRx.remoteTxEventQ.addr)->flag = UBR_STATE_CLOSING; + } + + uint32_t disconnectTimeout = FLAGS_ub_disconnect_timeout; + uint64_t startTime = GetCurNanoSeconds(); + + if (_trx->ubrTx.localTxEventQ.addr != nullptr && ((UbrEventQMsg *)_trx->ubrTx.localTxEventQ.addr)->flag == UBR_STATE_CONNECTED) { + ((UbrEventQMsg *)_trx->ubrTx.localTxEventQ.addr)->flag = UBR_STATE_CLOSED; + _trx->ubrTx.trxState = UBR_STATE_CLOSED; + } + + if (_trx->ubrTx.remoteRxEventQ.addr != nullptr) { + ((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->flag = UBR_STATE_CLOSED; + } + while (_trx->ubrRx.localRxEventQ.addr != nullptr && ((UbrEventQMsg *)_trx->ubrRx.localRxEventQ.addr)->flag != UBR_STATE_CLOSED) { + UbrSetSleepTask(UBR_TASK_CLOSE); + if (HasTimedOut(startTime, disconnectTimeout) != UBRING_OK) { + LOG(WARNING) << "Local shm " << _trx->localShm.name + << " wait for the peer to close timed out, force cleanup."; + _trx->ubrRx.trxState = UBR_STATE_CLOSED; + // Force synchronous cleanup instead of relying on async timer + DeleteTimerSafe((uint32_t)_trx->timerFd); + DeleteTimerSafe((uint32_t)_trx->hbTimerFd); + if (_trx->ubrTx.remoteRxEventQ.addr != nullptr) { + ((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->flag = UBR_STATE_CLOSED; + } + if (UNLIKELY(UbrTrxFreeShm(_trx) != UBRING_OK)) { + LOG(WARNING) << "Force close, local shm " << _trx->localShm.name << " free failed."; + } + if (UNLIKELY(UBRingManager::ReleaseUbrTrxFromMgr(_trx) != UBRING_OK)) { + LOG(WARNING) << "Force close, release trx " << _trx->localShm.name << " failed."; + } + return UBRING_ERR_TIMEOUT; + } + bthread_usleep(1000); // 1ms, yield to other bthreads + } + _trx->ubrRx.trxState = UBR_STATE_CLOSED; + RETURN_CODE rc; + if (UNLIKELY((rc = ClearTrxResource(_trx, startTime, UBR_SEND_CLOSE)) != UBRING_OK)) { + if (rc == UBRING_REENTRY) { + LOG(INFO) << "Trx close, peer is closing, trx local name=" << _trx->localShm.name; + return UBRING_OK; + } + LOG(ERROR) << "Trx close, clear trx resource failed, trx local name=" << _trx->localShm.name; + return UBRING_ERR; + } + // Unlink local shm name immediately so process exit does not leave visible leftovers. + RETURN_CODE unlinkRc = ShmFree(&_trx->localShm); + if (unlinkRc != UBRING_OK && unlinkRc != SHM_ERR_NOT_FOUND && unlinkRc != SHM_ERR_RESOURCE_ATTACHED) { + LOG(WARNING) << "Trx close, unlink local shm failed, trx local name=" << _trx->localShm.name + << ", rc=" << unlinkRc; + } + return UBRING_OK; +} + +RETURN_CODE UBRing::UbrAddCloseTimer() { + if (UNLIKELY(_trx == NULL)) { + LOG(ERROR) << "Trx add close timer failed, trx is null."; + return UBRING_ERR; + } + + uint32_t eventQTimerInterval = FLAGS_ub_event_queue_timer_interval * TIME_COVERSION; + itimerspec timeSpec = { + .it_interval = {.tv_sec = 0, .tv_nsec = eventQTimerInterval}, + .it_value = {.tv_sec = 0, .tv_nsec = 1} + }; + int timerFd = TimerStart(&timeSpec, UbrTrxCloseCallback, (void*)_trx); + if (UNLIKELY(timerFd == -1)) { + LOG(ERROR) << "Start ubr close timer failed, trx local name=" << _trx->localShm.name; + return UBRING_ERR; + } + _trx->timerFd = timerFd; + return UBRING_OK; +} + +RETURN_CODE UBRing::UbrAddTimer() { + if (UNLIKELY(UbrAddCloseTimer() != UBRING_OK)) { + LOG(ERROR) << "Ubr " << _trx->localShm.name << " add closed timer failed."; + return UBRING_ERR; + } + + if (UNLIKELY(UbrAddHBTimer() != UBRING_OK)) { + DeleteTimerSafe((uint32_t)_trx->timerFd); + LOG(ERROR) << "Ubr " << _trx->localShm.name << " add heartbeat timer failed."; + return UBRING_ERR; + } + return UBRING_OK; +} + +void* UBRing::UbrTrxCloseCallback(void* args) { + auto* trx = (UbrTrx*) args; + if (UNLIKELY(UBRing::UbrTrxCallbackCheck(trx) != UBRING_OK)) { + return nullptr; + } + + auto* localRxEventQ = (UbrEventQMsg *)trx->ubrRx.localRxEventQ.addr; + auto* localTxEventQ = (UbrEventQMsg *)trx->ubrTx.localTxEventQ.addr; + if (localRxEventQ->flag != UBR_STATE_CLOSED || localTxEventQ->flag == UBR_STATE_CLOSED) { + return nullptr; + } + trx->ubrRx.trxState = UBR_STATE_CLOSED; + int fd = (int)trx->localShm.fd; + do { + if (ATOMIC_LOAD(trx->closeCnt) == 0) { + break; + } + ATOMIC_SUB(trx->closeCnt, 1); + + uint64_t startTime = GetCurNanoSeconds(); + + if (localTxEventQ->flag == UBR_STATE_CONNECTED || ATOMIC_LOAD(trx->closeCnt) == 1) { + localTxEventQ->flag = UBR_STATE_CLOSED; + trx->ubrTx.trxState = UBR_STATE_CLOSED; + } + UbrEventQMsg* remoteRxEventQ = (UbrEventQMsg *)trx->ubrTx.remoteRxEventQ.addr; + if (remoteRxEventQ == nullptr) { + LOG(ERROR) << "Trx close callback failed, " << trx->localShm.name << " remoteRxEventQ is NULL."; + break; + } + remoteRxEventQ->flag = UBR_STATE_CLOSED; + RETURN_CODE clearRc = ClearTrxResource(trx, startTime, UBR_CALL_BACK_CLOSE, 1); + if (UNLIKELY(clearRc != UBRING_OK && clearRc != UBRING_REENTRY)) { + LOG(ERROR) << "Trx close callback failed, " << trx->localShm.name << " clear trx resource failed."; + break; + } + } while (0); + return nullptr; +} + +RETURN_CODE UBRing::UbrAddHBTimer() { + if (UNLIKELY(_trx == NULL)) { + LOG(ERROR) << "Trx add heartbeat timer failed, trx is null."; + return UBRING_ERR; + } + + itimerspec timeSpec = { + .it_interval = {.tv_sec = FLAGS_ub_hb_timer_interval, .tv_nsec = 0}, + .it_value = {.tv_sec = 0, .tv_nsec = 1} + }; + int timerFd = TimerStart(&timeSpec, UbrTrxHBCallback, (void*)_trx); + if (UNLIKELY(timerFd == -1)) { + LOG(ERROR) << "Start ubr heartbeat timer failed."; + return UBRING_ERR; + } + _trx->hbTimerFd = timerFd; + return UBRING_OK; +} + +RETURN_CODE UBRing::UbrPassiveClearTrx(UbrTrx *trx, int fd, PASSIVE_DISC_TYPE type) { + RETURN_CODE passiveCloseCheckRc = UbrTrxCloseCheck(trx); + if (UNLIKELY(passiveCloseCheckRc != UBRING_OK)) { + if (passiveCloseCheckRc == UBRING_REENTRY) { + LOG(INFO) << "Passive close skipped, active close in progress, name=" << trx->localShm.name; + uint64_t startTime = GetCurNanoSeconds(); + return ClearTrxResource(trx, startTime, UBR_CALL_BACK_CLOSE); + } + return UBRING_ERR; + } + trx->ubrTx.trxState = UBR_STATE_CLOSED; + trx->ubrRx.trxState = UBR_STATE_CLOSED; + DeleteTimerSafe((uint32_t)trx->timerFd); + const char *typeName = NULL; + if (type == UBR_HEARTBEAT) { + DeleteTimer((uint32_t)trx->hbTimerFd); + typeName = "Trx heartbeat"; + } else if (type == UBR_UB_EVENT) { + DeleteTimerSafe((uint32_t)trx->hbTimerFd); + typeName = "Ub event callback"; + } + bthread_usleep(FLAGS_ub_flying_io_timeout * 1000000LL); // yield-friendly sleep + + int rc = ShmLocalFree(&trx->remoteShm); + if (rc != UBRING_OK) { + LOG(ERROR) << typeName << ", delete remote shm failed. ret=" << rc; + } + rc = ShmLocalFree(&trx->localShm); + if (rc != UBRING_OK) { + LOG(ERROR) << typeName << ", delete local shm failed. ret=" << rc; + } + + UBRingManager::ReleaseUbrTrxFromMgr(trx); + return UBRING_OK; +} + +void* UBRing::UbrTrxHBCallback(void* args) { + auto* trx = (UbrTrx*) args; + if (UNLIKELY(UbrTrxCallbackCheck(trx) != UBRING_OK)) { + return NULL; + } + + auto* localDataStatus = (UbrDataStatusQMsg *)trx->ubrTx.localDataStatusQ.addr; + auto* remoteDataStatus = (UbrDataStatusQMsg *)trx->ubrRx.remoteDataStatusQ.addr; + if (UNLIKELY(localDataStatus == NULL || remoteDataStatus == NULL)) { + LOG(ERROR) << "Heartbeat error, datastatus is NULL."; + return NULL; + } + + if (trx->ubrTx.trxState != UBR_STATE_CONNECTED || trx->ubrRx.trxState != UBR_STATE_CONNECTED) { + LOG_EVERY_SECOND(INFO) << "Heartbeat cannot be started, wait connected state."; + return NULL; + } + + remoteDataStatus->heartBeat = 1; + if (localDataStatus->heartBeat == 1) { + localDataStatus->heartBeat = 0; + trx->ubrTx.hbRetryCnt = 0; + return NULL; + } + + ++trx->ubrTx.hbRetryCnt; + if (trx->ubrTx.hbRetryCnt <= FLAGS_ub_hb_retry_cnt) { + return NULL; + } + + int fd = (int)trx->localShm.fd; + LOG(INFO) << "Hlc heartbeat, start to clear trx resource. hbTimerFd=" << fd << ", shmName=" << trx->localShm.name; + UbrPassiveClearTrx(trx, fd, UBR_HEARTBEAT); + LOG(INFO) << "Hlc heartbeat clear trx resource finish."; + return NULL; +} + +RETURN_CODE UBRing::UbrAddAsynClearTimer(UbrTrx *trx) { + if (UNLIKELY(trx == NULL)) { + LOG(ERROR) << "Trx add close timer failed, trx is null."; + return UBRING_ERR; + } + + if (trx->clearTimerFd > 0) { + return UBRING_OK; + } + + itimerspec timeSpec = { + .it_interval = {.tv_sec = 0, .tv_nsec = 0}, + .it_value = {.tv_sec = FLAGS_ub_flying_io_timeout, .tv_nsec = 0} + }; + + int timerFd = TimerStart(&timeSpec, UbrAsynClearCallback, (void*)trx); + if (UNLIKELY(timerFd == -1)) { + LOG(ERROR) << "Start ubr close timer failed, trx name=%s.", trx->localShm.name; + return UBRING_ERR; + } + trx->clearTimerFd = timerFd; + return UBRING_OK; +} + +void *UBRing::UbrAsynClearCallback(void *args) +{ + auto* trx = (UbrTrx*) args; + if (UNLIKELY(trx == NULL)) { + LOG(ERROR) << "Trx close, trx is null."; + return NULL; + } + + if (UNLIKELY(UbrTrxFreeShm(trx) != UBRING_OK)) { + LOG(ERROR) << "Trx close, wait for local shm " << trx->localShm.name << " free fail."; + } + + if (UNLIKELY(UBRingManager::ReleaseUbrTrxFromMgr(trx) != UBRING_OK)) { + LOG(ERROR) << "Trx close, release shm " << trx->localShm.name << " trx failed."; + } + return NULL; +} + +int UBRing::UbrTrxSend(const void *buf, uint32_t bufLen) +{ + if (UNLIKELY(CheckTrxSendPreCheck(_trx) != UBRING_OK)) { + return UBRING_ERR; + } + // 1.2 计算空间 Review Comment: Please use English. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
