GitHub user shach1990 edited a discussion: 大量事务消息出现反复回查的情况

**请教各位是否遇到过类似的问题,初步怀疑是否时因为 transientStorePoolEnable 为 
true导致,但本地无法复现,但线上环境100%出现,有没有解决方案呢?**

### 背景
事务消息基本上存在反复回查的情况,应用的`TransactionListener`的`executeLocalTransaction`和`checkLocalTransaction`方法都确认返回`COMMIT_MESSAGE`,大部分消息回查1次成功,还有一部分回查几次才成功,还有小部分回查15次都不正常就丢失了。

### 环境
集群3主3从,版本V4.8.0。


### 现象
通过Arthas查到回查调用(`org.apache.rocketmq.broker.processor.EndTransactionProcessor.processRequest`)返回`Find
 prepared transaction message failed`错误日志如下:
```
method=org.apache.rocketmq.broker.processor.EndTransactionProcessor.processRequest
 location=AtExit
ts=2023-06-27 10:26:48; [cost=0.060751ms] result=@ArrayList[
    @Object[][
        
@DefaultChannelHandlerContext[io.netty.channel.DefaultChannelHandlerContext@6bb6e1a1],
        @RemotingCommand[RemotingCommand [code=37, language=JAVA, version=373, 
opaque=31756804, flag(B)=10, remark=null, 
extFields={producerGroup=warehouseProducer, commitLogOffset=48902747609, 
msgId=7F00000100016801B4148821A5ACB356, tranStateTableOffset=3210422, 
commitOrRollback=8, transactionId=7F00000100016801B4148821A5ACB356, 
fromTransactionCheck=true}, serializeTypeCurrentRPC=JSON]],
    ],
    @RemotingCommand[
        SERIALIZE_TYPE_PROPERTY=@String[rocketmq.serialize.type],
        SERIALIZE_TYPE_ENV=@String[ROCKETMQ_SERIALIZE_TYPE],
        REMOTING_VERSION_KEY=@String[rocketmq.remoting.version],
        
log=@Slf4jLogger[org.apache.rocketmq.logging.Slf4jLoggerFactory$Slf4jLogger@69060658],
        RPC_TYPE=@Integer[0],
        RPC_ONEWAY=@Integer[1],
        CLASS_HASH_MAP=@HashMap[isEmpty=false;size=31],
        CANONICAL_NAME_CACHE=@HashMap[isEmpty=false;size=5],
        NULLABLE_FIELD_CACHE=@HashMap[isEmpty=false;size=8],
        STRING_CANONICAL_NAME=@String[java.lang.String],
        DOUBLE_CANONICAL_NAME_1=@String[java.lang.Double],
        DOUBLE_CANONICAL_NAME_2=@String[double],
        INTEGER_CANONICAL_NAME_1=@String[java.lang.Integer],
        INTEGER_CANONICAL_NAME_2=@String[int],
        LONG_CANONICAL_NAME_1=@String[java.lang.Long],
        LONG_CANONICAL_NAME_2=@String[long],
        BOOLEAN_CANONICAL_NAME_1=@String[java.lang.Boolean],
        BOOLEAN_CANONICAL_NAME_2=@String[boolean],
        configVersion=@Integer[373],
        requestId=@AtomicInteger[-1931087276],
        serializeTypeConfigInThisServer=@SerializeType[JSON],
        code=@Integer[1],
        language=@LanguageCode[JAVA],
        version=@Integer[373],
        opaque=@Integer[-1931087279],
        flag=@Integer[1],
        remark=@String[Find prepared transaction message failed],
        extFields=null,
        customHeader=null,
        serializeTypeCurrentRPC=@SerializeType[JSON],
        body=null,
    ],
]
```

`store.log` 也大量出现`selectMappedBuffer request pos invalid`日志:
```
2023-06-23 13:40:05 WARN EndTransactionThread_3 - selectMappedBuffer request 
pos invalid, request pos: 630412830, size: 4, fileFromOffset: 36507222016
2023-06-23 13:40:21 WARN EndTransactionThread_1 - selectMappedBuffer request 
pos invalid, request pos: 631183337, size: 4, fileFromOffset: 36507222016
2023-06-23 13:40:21 WARN EndTransactionThread_6 - selectMappedBuffer request 
pos invalid, request pos: 631185244, size: 4, fileFromOffset: 36507222016
2023-06-23 13:40:21 WARN EndTransactionThread_16 - selectMappedBuffer request 
pos invalid, request pos: 631187159, size: 4, fileFromOffset: 36507222016
2023-06-23 13:40:34 INFO StoreStatsService - [STORETPS] put_tps 
27.1639502716395 get_found_tps 154.8845115488451 get_miss_tps 340.0826584008266 
get_transfered_tps 162.05046162050462
2023-06-23 13:40:34 INFO StoreStatsService - [PAGECACHERT] TotalPut 1630, 
PutMessageDistributeTime [<=0ms]:1609 [0~10ms]:21 [10~50ms]:0 [50~100ms]:0 
[100~200ms]:0 [200~500ms]:0 [500ms~1s]:0 [1~2s]:0 [2~3s]:0 [3~4s]:0 [4~5s]:0 
[5~10s]:0 [10s~]:0 
2023-06-23 13:40:35 WARN EndTransactionThread_18 - selectMappedBuffer request 
pos invalid, request pos: 632039772, size: 4, fileFromOffset: 36507222016

```

查看源码得知,就是因为`org.apache.rocketmq.store.MappedFile#selectMappedBuffer(int, 
int)`查询不到消息,报WARN日志`selectMappedBuffer request pos 
invalid`,然后导致`org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#getHalfMessageByOffset`返回`Find
 prepared transaction message failed`错误。


### Broker 配置
```
serverSelectorThreads=3
brokerRole=ASYNC_MASTER
serverSocketRcvBufSize=131072
osPageCacheBusyTimeOutMills=1000
shortPollingTimeMills=1000
clientSocketRcvBufSize=131072
clusterTopicEnable=true
brokerTopicEnable=true
autoCreateTopicEnable=true
maxErrorRateOfBloomFilter=20
maxMsgsNumBatch=64
cleanResourceInterval=10000
commercialBaseCount=1
maxTransferCountOnMessageInMemory=32
brokerFastFailureEnable=true
brokerClusterName=rocketmq-cluster
flushDiskType=ASYNC_FLUSH
mapedFileSizeCommitLog=1073741824
commercialBigCount=1
mappedFileSizeConsumeQueue=6000000
consumerFallbehindThreshold=17179869184
autoCreateSubscriptionGroup=true
transientStorePoolEnable=true
flushConsumerOffsetInterval=5000
checkTransactionMessageEnable=false
waitTimeMillsInHeartbeatQueue=31000
diskMaxUsedSpaceRatio=88
flushCommitLogLeastPages=4
cleanFileForciblyEnable=true
slaveReadEnable=true
msgTraceTopicName=RMQ_SYS_TRACE_TOPIC
expectConsumerNumUseFilter=32
traceTopicEnable=false
useEpollNativeSelector=false
enablePropertyFilter=false
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
deleteCommitLogFilesInterval=100
brokerName=broker-a
maxTransferBytesOnMessageInDisk=65536
mapedFileSizeConsumeQueue=300000
listenPort=23301
flushConsumeQueueLeastPages=2
pullMessageThreadPoolNums=128
useReentrantLockWhenPutMessage=true
flushIntervalConsumeQueue=1000
sendThreadPoolQueueCapacity=10000
debugLockEnable=false
haHousekeepingInterval=20000
diskFallRecorded=true
messageIndexEnable=true
clientAsyncSemaphoreValue=65535
clientCallbackExecutorThreads=8
putMsgIndexHightWater=600000
sendMessageThreadPoolNums=32
clientManagerThreadPoolQueueCapacity=1000000
serverSocketSndBufSize=131072
maxDelayTime=40
clientSocketSndBufSize=131072
namesrvAddr=ip1;ip2;ip3
commercialEnable=true
maxHashSlotNum=5000000
heartbeatThreadPoolNums=8
transactionTimeOut=6000
maxMessageSize=65536
adminBrokerThreadPoolNums=16
defaultQueryMaxNum=32
maxTransferBytesOnMessageInMemory=262144
forceRegister=true
enableConsumeQueueExt=false
longPollingEnable=true
serverWorkerThreads=8
messageIndexSafe=false
deleteConsumeQueueFilesInterval=100
haSlaveFallbehindMax=268435456
serverCallbackExecutorThreads=0
flushCommitLogThoroughInterval=10000
isEnableBatchPush=false
commercialTimerCount=1
enableDLegerCommitLog=false
useTLS=false
redeleteHangedFileInterval=120000
flushIntervalCommitLog=500
rocketmqHome=/app/rocketmq
queryMessageThreadPoolNums=16
destroyMapedFileInterval=120000
messageStorePlugIn=
serverChannelMaxIdleTimeSeconds=120
maxIndexNum=20000000
filterDataCleanTimeSpan=86400000
filterServerNums=0
commitCommitLogLeastPages=4
waitTimeMillsInPullQueue=5000
haSendHeartbeatInterval=5000
processReplyMessageThreadPoolNums=32
clientChannelMaxIdleTimeSeconds=120
filterSupportRetry=false
flushDelayOffsetInterval=10000
duplicationEnable=false
replyThreadPoolQueueCapacity=10000
offsetCheckInSlave=false
clientCloseSocketIfTimeout=false
transientStorePoolSize=10
waitTimeMillsInSendQueue=5000
warmMapedFileEnable=false
endTransactionThreadPoolNums=24
flushCommitLogTimed=false
flushLeastPagesWhenWarmMapedFile=4096
clientWorkerThreads=4
endTransactionPoolQueueCapacity=100000
registerNameServerPeriod=30000
registerBrokerTimeoutMills=6000
accessMessageInMemoryMaxRatio=40
highSpeedMode=false
transactionCheckMax=15
checkCRCOnRecover=true
destroyMapedFileIntervalForcibly=120000
brokerIP2=ip1
brokerIP1=ip2
commitIntervalCommitLog=200
clientOnewaySemaphoreValue=65535
storeReplyMessageEnable=true
traceOn=true
clientManageThreadPoolNums=32
channelNotActiveInterval=60000
mappedFileSizeConsumeQueueExt=50331648
consumerManagerThreadPoolQueueCapacity=1000000
serverOnewaySemaphoreValue=256
#haListenPort=23311
enableCalcFilterBitMap=false
clientPooledByteBufAllocatorEnable=false
aclEnable=false
storePathRootDir=/app/rocketmq/data
syncFlushTimeout=5000
rejectTransactionMessage=false
commitCommitLogThoroughInterval=200
connectTimeoutMillis=3000
queryThreadPoolQueueCapacity=20000
regionId=DefaultRegion
consumerManageThreadPoolNums=32
disableConsumeIfConsumerReadSlowly=false
flushConsumerOffsetHistoryInterval=60000
fetchNamesrvAddrByAddressServer=false
haTransferBatchSize=32768
compressedRegister=false
storePathCommitLog=/app/rocketmq/data/commitlog
commercialTransCount=1
transactionCheckInterval=60000
mappedFileSizeCommitLog=1073741824
startAcceptSendRequestTimeStamp=0
serverPooledByteBufAllocatorEnable=true
serverAsyncSemaphoreValue=64
autoDeleteUnusedStats=false
waitTimeMillsInTransactionQueue=3000
heartbeatThreadPoolQueueCapacity=50000
deleteWhen=04
bitMapLengthConsumeQueueExt=112
fastFailIfNoBufferInStorePool=false
defaultTopicQueueNums=4
flushConsumeQueueThoroughInterval=60000
notifyConsumerIdsChangedEnable=true
brokerPermission=6
fileReservedTime=120
transferMsgByHeap=true
pullThreadPoolQueueCapacity=100000
brokerId=0
maxTransferCountOnMessageInDisk=8
traceTopicEnable=true
```




GitHub link: https://github.com/apache/rocketmq/discussions/6954

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org

Reply via email to