GitHub user shach1990 edited a discussion: 大量事务消息出现反复回查的情况
**请教各位是否遇到过类似的问题,初步怀疑是否时因为 transientStorePoolEnable 为 true导致,但本地无法复现,但线上环境100%出现,有没有解决方案呢?** ### 背景 事务消息基本上存在反复回查的情况,应用的`TransactionListener`的`executeLocalTransaction`和`checkLocalTransaction`方法都确认返回`COMMIT_MESSAGE`,大部分消息回查1次成功,还有一部分回查几次才成功,还有小部分回查15次都不正常就丢失了。 ### 环境 集群3主3从,版本V4.8.0。 ### 现象 通过Arthas查到回查调用(`org.apache.rocketmq.broker.processor.EndTransactionProcessor.processRequest`)返回`Find prepared transaction message failed`错误日志如下: ``` method=org.apache.rocketmq.broker.processor.EndTransactionProcessor.processRequest location=AtExit ts=2023-06-27 10:26:48; [cost=0.060751ms] result=@ArrayList[ @Object[][ @DefaultChannelHandlerContext[io.netty.channel.DefaultChannelHandlerContext@6bb6e1a1], @RemotingCommand[RemotingCommand [code=37, language=JAVA, version=373, opaque=31756804, flag(B)=10, remark=null, extFields={producerGroup=warehouseProducer, commitLogOffset=48902747609, msgId=7F00000100016801B4148821A5ACB356, tranStateTableOffset=3210422, commitOrRollback=8, transactionId=7F00000100016801B4148821A5ACB356, fromTransactionCheck=true}, serializeTypeCurrentRPC=JSON]], ], @RemotingCommand[ SERIALIZE_TYPE_PROPERTY=@String[rocketmq.serialize.type], SERIALIZE_TYPE_ENV=@String[ROCKETMQ_SERIALIZE_TYPE], REMOTING_VERSION_KEY=@String[rocketmq.remoting.version], log=@Slf4jLogger[org.apache.rocketmq.logging.Slf4jLoggerFactory$Slf4jLogger@69060658], RPC_TYPE=@Integer[0], RPC_ONEWAY=@Integer[1], CLASS_HASH_MAP=@HashMap[isEmpty=false;size=31], CANONICAL_NAME_CACHE=@HashMap[isEmpty=false;size=5], NULLABLE_FIELD_CACHE=@HashMap[isEmpty=false;size=8], STRING_CANONICAL_NAME=@String[java.lang.String], DOUBLE_CANONICAL_NAME_1=@String[java.lang.Double], DOUBLE_CANONICAL_NAME_2=@String[double], INTEGER_CANONICAL_NAME_1=@String[java.lang.Integer], INTEGER_CANONICAL_NAME_2=@String[int], LONG_CANONICAL_NAME_1=@String[java.lang.Long], LONG_CANONICAL_NAME_2=@String[long], BOOLEAN_CANONICAL_NAME_1=@String[java.lang.Boolean], BOOLEAN_CANONICAL_NAME_2=@String[boolean], configVersion=@Integer[373], requestId=@AtomicInteger[-1931087276], serializeTypeConfigInThisServer=@SerializeType[JSON], code=@Integer[1], language=@LanguageCode[JAVA], version=@Integer[373], opaque=@Integer[-1931087279], flag=@Integer[1], remark=@String[Find prepared transaction message failed], extFields=null, customHeader=null, serializeTypeCurrentRPC=@SerializeType[JSON], body=null, ], ] ``` `store.log` 也大量出现`selectMappedBuffer request pos invalid`日志: ``` 2023-06-23 13:40:05 WARN EndTransactionThread_3 - selectMappedBuffer request pos invalid, request pos: 630412830, size: 4, fileFromOffset: 36507222016 2023-06-23 13:40:21 WARN EndTransactionThread_1 - selectMappedBuffer request pos invalid, request pos: 631183337, size: 4, fileFromOffset: 36507222016 2023-06-23 13:40:21 WARN EndTransactionThread_6 - selectMappedBuffer request pos invalid, request pos: 631185244, size: 4, fileFromOffset: 36507222016 2023-06-23 13:40:21 WARN EndTransactionThread_16 - selectMappedBuffer request pos invalid, request pos: 631187159, size: 4, fileFromOffset: 36507222016 2023-06-23 13:40:34 INFO StoreStatsService - [STORETPS] put_tps 27.1639502716395 get_found_tps 154.8845115488451 get_miss_tps 340.0826584008266 get_transfered_tps 162.05046162050462 2023-06-23 13:40:34 INFO StoreStatsService - [PAGECACHERT] TotalPut 1630, PutMessageDistributeTime [<=0ms]:1609 [0~10ms]:21 [10~50ms]:0 [50~100ms]:0 [100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 [5~10s]:0 [10s~]:0 2023-06-23 13:40:35 WARN EndTransactionThread_18 - selectMappedBuffer request pos invalid, request pos: 632039772, size: 4, fileFromOffset: 36507222016 ``` 查看源码得知,就是因为`org.apache.rocketmq.store.MappedFile#selectMappedBuffer(int, int)`查询不到消息,报WARN日志`selectMappedBuffer request pos invalid`,然后导致`org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#getHalfMessageByOffset`返回`Find prepared transaction message failed`错误。 ### Broker 配置 ``` serverSelectorThreads=3 brokerRole=ASYNC_MASTER serverSocketRcvBufSize=131072 osPageCacheBusyTimeOutMills=1000 shortPollingTimeMills=1000 clientSocketRcvBufSize=131072 clusterTopicEnable=true brokerTopicEnable=true autoCreateTopicEnable=true maxErrorRateOfBloomFilter=20 maxMsgsNumBatch=64 cleanResourceInterval=10000 commercialBaseCount=1 maxTransferCountOnMessageInMemory=32 brokerFastFailureEnable=true brokerClusterName=rocketmq-cluster flushDiskType=ASYNC_FLUSH mapedFileSizeCommitLog=1073741824 commercialBigCount=1 mappedFileSizeConsumeQueue=6000000 consumerFallbehindThreshold=17179869184 autoCreateSubscriptionGroup=true transientStorePoolEnable=true flushConsumerOffsetInterval=5000 checkTransactionMessageEnable=false waitTimeMillsInHeartbeatQueue=31000 diskMaxUsedSpaceRatio=88 flushCommitLogLeastPages=4 cleanFileForciblyEnable=true slaveReadEnable=true msgTraceTopicName=RMQ_SYS_TRACE_TOPIC expectConsumerNumUseFilter=32 traceTopicEnable=false useEpollNativeSelector=false enablePropertyFilter=false messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h deleteCommitLogFilesInterval=100 brokerName=broker-a maxTransferBytesOnMessageInDisk=65536 mapedFileSizeConsumeQueue=300000 listenPort=23301 flushConsumeQueueLeastPages=2 pullMessageThreadPoolNums=128 useReentrantLockWhenPutMessage=true flushIntervalConsumeQueue=1000 sendThreadPoolQueueCapacity=10000 debugLockEnable=false haHousekeepingInterval=20000 diskFallRecorded=true messageIndexEnable=true clientAsyncSemaphoreValue=65535 clientCallbackExecutorThreads=8 putMsgIndexHightWater=600000 sendMessageThreadPoolNums=32 clientManagerThreadPoolQueueCapacity=1000000 serverSocketSndBufSize=131072 maxDelayTime=40 clientSocketSndBufSize=131072 namesrvAddr=ip1;ip2;ip3 commercialEnable=true maxHashSlotNum=5000000 heartbeatThreadPoolNums=8 transactionTimeOut=6000 maxMessageSize=65536 adminBrokerThreadPoolNums=16 defaultQueryMaxNum=32 maxTransferBytesOnMessageInMemory=262144 forceRegister=true enableConsumeQueueExt=false longPollingEnable=true serverWorkerThreads=8 messageIndexSafe=false deleteConsumeQueueFilesInterval=100 haSlaveFallbehindMax=268435456 serverCallbackExecutorThreads=0 flushCommitLogThoroughInterval=10000 isEnableBatchPush=false commercialTimerCount=1 enableDLegerCommitLog=false useTLS=false redeleteHangedFileInterval=120000 flushIntervalCommitLog=500 rocketmqHome=/app/rocketmq queryMessageThreadPoolNums=16 destroyMapedFileInterval=120000 messageStorePlugIn= serverChannelMaxIdleTimeSeconds=120 maxIndexNum=20000000 filterDataCleanTimeSpan=86400000 filterServerNums=0 commitCommitLogLeastPages=4 waitTimeMillsInPullQueue=5000 haSendHeartbeatInterval=5000 processReplyMessageThreadPoolNums=32 clientChannelMaxIdleTimeSeconds=120 filterSupportRetry=false flushDelayOffsetInterval=10000 duplicationEnable=false replyThreadPoolQueueCapacity=10000 offsetCheckInSlave=false clientCloseSocketIfTimeout=false transientStorePoolSize=10 waitTimeMillsInSendQueue=5000 warmMapedFileEnable=false endTransactionThreadPoolNums=24 flushCommitLogTimed=false flushLeastPagesWhenWarmMapedFile=4096 clientWorkerThreads=4 endTransactionPoolQueueCapacity=100000 registerNameServerPeriod=30000 registerBrokerTimeoutMills=6000 accessMessageInMemoryMaxRatio=40 highSpeedMode=false transactionCheckMax=15 checkCRCOnRecover=true destroyMapedFileIntervalForcibly=120000 brokerIP2=ip1 brokerIP1=ip2 commitIntervalCommitLog=200 clientOnewaySemaphoreValue=65535 storeReplyMessageEnable=true traceOn=true clientManageThreadPoolNums=32 channelNotActiveInterval=60000 mappedFileSizeConsumeQueueExt=50331648 consumerManagerThreadPoolQueueCapacity=1000000 serverOnewaySemaphoreValue=256 #haListenPort=23311 enableCalcFilterBitMap=false clientPooledByteBufAllocatorEnable=false aclEnable=false storePathRootDir=/app/rocketmq/data syncFlushTimeout=5000 rejectTransactionMessage=false commitCommitLogThoroughInterval=200 connectTimeoutMillis=3000 queryThreadPoolQueueCapacity=20000 regionId=DefaultRegion consumerManageThreadPoolNums=32 disableConsumeIfConsumerReadSlowly=false flushConsumerOffsetHistoryInterval=60000 fetchNamesrvAddrByAddressServer=false haTransferBatchSize=32768 compressedRegister=false storePathCommitLog=/app/rocketmq/data/commitlog commercialTransCount=1 transactionCheckInterval=60000 mappedFileSizeCommitLog=1073741824 startAcceptSendRequestTimeStamp=0 serverPooledByteBufAllocatorEnable=true serverAsyncSemaphoreValue=64 autoDeleteUnusedStats=false waitTimeMillsInTransactionQueue=3000 heartbeatThreadPoolQueueCapacity=50000 deleteWhen=04 bitMapLengthConsumeQueueExt=112 fastFailIfNoBufferInStorePool=false defaultTopicQueueNums=4 flushConsumeQueueThoroughInterval=60000 notifyConsumerIdsChangedEnable=true brokerPermission=6 fileReservedTime=120 transferMsgByHeap=true pullThreadPoolQueueCapacity=100000 brokerId=0 maxTransferCountOnMessageInDisk=8 traceTopicEnable=true ``` GitHub link: https://github.com/apache/rocketmq/discussions/6954 ---- This is an automatically sent email for dev@rocketmq.apache.org. To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org