This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new fe5c17cb8b Optimized the function naming (#9935)
fe5c17cb8b is described below
commit fe5c17cb8b4196e13dbfc3230e0f7dece9c9f6a7
Author: Drizzle <[email protected]>
AuthorDate: Fri Dec 19 16:03:49 2025 +0800
Optimized the function naming (#9935)
* add isWakeCommitWhenPutMessage for AIO
* optimzie the Function name
Change-Id: Id91e3eb9c4488fb9804fb2c105082657e66c44c0
* optimized the function naming
Change-Id: Ifc482f91220ff328e5c5425a57a04ac627e8d469
---------
Co-authored-by: drizzle.zk <[email protected]>
---
.../main/java/org/apache/rocketmq/broker/BrokerController.java | 2 +-
.../apache/rocketmq/broker/processor/EndTransactionProcessor.java | 6 +++---
.../transaction/rocksdb/TransactionalMessageRocksDBService.java | 2 +-
store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 8 ++++----
.../main/java/org/apache/rocketmq/store/DefaultMessageStore.java | 6 +++---
store/src/main/java/org/apache/rocketmq/store/MessageStore.java | 6 +++---
.../java/org/apache/rocketmq/store/timer/TimerMessageStore.java | 8 ++++----
.../java/org/apache/rocketmq/tieredstore/TieredMessageStore.java | 6 +++---
8 files changed, 22 insertions(+), 22 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index a09e2173b6..efc2949364 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -878,7 +878,7 @@ public class BrokerController {
}
if (messageStoreConfig.isTransRocksDBEnable()) {
this.transMessageRocksDBStore = new
TransMessageRocksDBStore(messageStore, brokerStatsManager, new
InetSocketAddress(this.getBrokerConfig().getBrokerIP1(),
this.getNettyServerConfig().getListenPort()));
-
this.messageStore.setTransRocksDBStore(transMessageRocksDBStore);
+
this.messageStore.setTransMessageRocksDBStore(transMessageRocksDBStore);
}
} catch (Exception e) {
result = false;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index 7298e5da58..f90b534204 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -203,7 +203,7 @@ public class EndTransactionProcessor implements
NettyRequestProcessor {
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(halfTopic)) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(prepareMessage);
} else if
(this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() &&
TopicValidator.RMQ_SYS_ROCKSDB_TRANS_HALF_TOPIC.equals(halfTopic)) {
-
this.brokerController.getMessageStore().getTransRocksDBStore().deletePrepareMessage(prepareMessage);
+
this.brokerController.getMessageStore().getTransMessageRocksDBStore().deletePrepareMessage(prepareMessage);
} else {
LOGGER.warn("deletePrepareMessage error, topic of half message is:
{}, transRocksDBEnable: {}", halfTopic,
this.brokerController.getMessageStoreConfig().isTransRocksDBEnable());
}
@@ -287,8 +287,8 @@ public class EndTransactionProcessor implements
NettyRequestProcessor {
long tagsCodeValue =
MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
msgInner.setTagsCode(tagsCodeValue);
String checkTimes =
msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES);
- if (StringUtils.isEmpty(checkTimes) &&
this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && null !=
this.brokerController.getMessageStore().getTransRocksDBStore()) {
- Integer checkTimesRocksDB =
this.brokerController.getMessageStore().getTransRocksDBStore().getCheckTimes(msgInner.getTopic(),
msgInner.getTransactionId(), msgExt.getCommitLogOffset());
+ if (StringUtils.isEmpty(checkTimes) &&
this.brokerController.getMessageStoreConfig().isTransRocksDBEnable() && null !=
this.brokerController.getMessageStore().getTransMessageRocksDBStore()) {
+ Integer checkTimesRocksDB =
this.brokerController.getMessageStore().getTransMessageRocksDBStore().getCheckTimes(msgInner.getTopic(),
msgInner.getTransactionId(), msgExt.getCommitLogOffset());
if (null != checkTimesRocksDB && checkTimesRocksDB >= 0) {
msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES,
String.valueOf(checkTimesRocksDB));
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
index 389c75e426..1fc38eb3d6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
@@ -58,7 +58,7 @@ public class TransactionalMessageRocksDBService {
public TransactionalMessageRocksDBService(final MessageStore messageStore,
final BrokerController brokerController) {
this.messageStore = messageStore;
- this.transMessageRocksDBStore = messageStore.getTransRocksDBStore();
+ this.transMessageRocksDBStore =
messageStore.getTransMessageRocksDBStore();
this.messageRocksDBStorage =
transMessageRocksDBStore.getMessageRocksDBStorage();
this.brokerController = brokerController;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 459f2074b2..286f31cd4a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -111,9 +111,9 @@ public class CommitLog implements Swappable {
public CommitLog(final DefaultMessageStore messageStore) {
String storePath =
messageStore.getMessageStoreConfig().getStorePathCommitLog();
- RunningFlags runningFlags =
messageStore.getMessageStoreConfig().isEnableRunningFlagsInFlush()
+ RunningFlags runningFlags =
messageStore.getMessageStoreConfig().isEnableRunningFlagsInFlush()
? messageStore.getRunningFlags() : null;
-
+
if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) {
this.mappedFileQueue = new
MultiPathMappedFileQueue(messageStore.getMessageStoreConfig(),
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
@@ -927,8 +927,8 @@ public class CommitLog implements Swappable {
private boolean isMappedFileMatchedRecover(long phyOffset, long
storeTimestamp, boolean recoverNormally) throws RocksDBException {
boolean result =
this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset,
storeTimestamp, recoverNormally);
- if (null != this.defaultMessageStore.getTransRocksDBStore() &&
defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() &&
!defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable())
{
- result = result &&
this.defaultMessageStore.getTransRocksDBStore().isMappedFileMatchedRecover(phyOffset);
+ if (null != this.defaultMessageStore.getTransMessageRocksDBStore() &&
defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() &&
!defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable())
{
+ result = result &&
this.defaultMessageStore.getTransMessageRocksDBStore().isMappedFileMatchedRecover(phyOffset);
}
if (null != this.defaultMessageStore.getIndexRocksDBStore() &&
defaultMessageStore.getMessageStoreConfig().isIndexRocksDBEnable()) {
result = result &&
this.defaultMessageStore.getIndexRocksDBStore().isMappedFileMatchedRecover(phyOffset);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 7848b76016..aae6d50da9 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1080,12 +1080,12 @@ public class DefaultMessageStore implements
MessageStore {
}
@Override
- public TimerMessageRocksDBStore getTimerRocksDBStore() {
+ public TimerMessageRocksDBStore getTimerMessageRocksDBStore() {
return this.timerMessageRocksDBStore;
}
@Override
- public TransMessageRocksDBStore getTransRocksDBStore() {
+ public TransMessageRocksDBStore getTransMessageRocksDBStore() {
return this.transMessageRocksDBStore;
}
@@ -1100,7 +1100,7 @@ public class DefaultMessageStore implements MessageStore {
}
@Override
- public void setTransRocksDBStore(TransMessageRocksDBStore
transMessageRocksDBStore) {
+ public void setTransMessageRocksDBStore(TransMessageRocksDBStore
transMessageRocksDBStore) {
this.transMessageRocksDBStore = transMessageRocksDBStore;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index b297ee542f..2490bb5b2f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -210,15 +210,15 @@ public interface MessageStore {
TimerMessageStore getTimerMessageStore();
- TimerMessageRocksDBStore getTimerRocksDBStore();
+ TimerMessageRocksDBStore getTimerMessageRocksDBStore();
- TransMessageRocksDBStore getTransRocksDBStore();
+ TransMessageRocksDBStore getTransMessageRocksDBStore();
void setTimerMessageStore(TimerMessageStore timerMessageStore);
void setTimerMessageRocksDBStore(TimerMessageRocksDBStore
timerMessageRocksDBStore);
- void setTransRocksDBStore(TransMessageRocksDBStore
transMessageRocksDBStore);
+ void setTransMessageRocksDBStore(TransMessageRocksDBStore
transMessageRocksDBStore);
/**
* Get the offset of the message in the commit log, which is also known as
physical offset.
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 53999e72c4..a32b4a3f21 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -318,7 +318,7 @@ public class TimerMessageStore {
}
currQueueOffset = Math.min(currQueueOffset,
timerCheckpoint.getMasterTimerQueueOffset());
if (storeConfig.isTimerRocksDBEnable()) {
- long commitOffsetInRocksDB =
messageStore.getTimerRocksDBStore().getCommitOffsetInRocksDB();
+ long commitOffsetInRocksDB =
messageStore.getTimerMessageRocksDBStore().getCommitOffsetInRocksDB();
LOGGER.info("recover time wheel, currQueueOffset: {},
commitOffsetInRocksDB: {}", currQueueOffset, commitOffsetInRocksDB);
currQueueOffset = Math.max(currQueueOffset, commitOffsetInRocksDB);
}
@@ -2087,12 +2087,12 @@ public class TimerMessageStore {
LOGGER.error("recallToTimeline param error, delayTime: {},
offsetPy: {}, sizePy: {}, messageExt: {}", delayTime, offsetPy, sizePy,
messageExt);
return;
}
- if (null == messageStore.getTimerRocksDBStore() || null ==
messageStore.getTimerRocksDBStore().getTimeline()) {
+ if (null == messageStore.getTimerMessageRocksDBStore() || null ==
messageStore.getTimerMessageRocksDBStore().getTimeline()) {
LOGGER.error("recallToTimeline error, timerRocksDBStore is null or
timeline is null");
return;
}
try {
-
messageStore.getTimerRocksDBStore().getTimeline().putDeleteRecord(delayTime,
messageExt.getMsgId(), offsetPy, sizePy, messageExt.getQueueOffset(),
messageExt);
+
messageStore.getTimerMessageRocksDBStore().getTimeline().putDeleteRecord(delayTime,
messageExt.getMsgId(), offsetPy, sizePy, messageExt.getQueueOffset(),
messageExt);
} catch (Exception e) {
LOGGER.error("recallToTimeline error: {}", e.getMessage());
}
@@ -2109,7 +2109,7 @@ public class TimerMessageStore {
LOGGER.info("restart TimerMessageStore has been running");
return true;
}
- long commitOffsetRocksDB =
this.messageStore.getTimerRocksDBStore().getCommitOffsetInRocksDB();
+ long commitOffsetRocksDB =
this.messageStore.getTimerMessageRocksDBStore().getCommitOffsetInRocksDB();
long commitOffsetFile =
this.messageStore.getTimerMessageStore().getCommitQueueOffset();
long maxCommitOffset = Math.max(commitOffsetFile,
commitOffsetRocksDB);
currQueueOffset = maxCommitOffset;
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 3e84f20122..b30f868d19 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -314,12 +314,12 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
}
@Override
- public TimerMessageRocksDBStore getTimerRocksDBStore() {
+ public TimerMessageRocksDBStore getTimerMessageRocksDBStore() {
return timerMessageRocksDBStore;
}
@Override
- public TransMessageRocksDBStore getTransRocksDBStore() {
+ public TransMessageRocksDBStore getTransMessageRocksDBStore() {
return transMessageRocksDBStore;
}
@@ -329,7 +329,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
}
@Override
- public void setTransRocksDBStore(TransMessageRocksDBStore
transMessageRocksDBStore) {
+ public void setTransMessageRocksDBStore(TransMessageRocksDBStore
transMessageRocksDBStore) {
this.transMessageRocksDBStore = transMessageRocksDBStore;
}