yuz10 commented on a change in pull request #2854:
URL: https://github.com/apache/rocketmq/pull/2854#discussion_r664159239
##########
File path: store/src/main/java/org/apache/rocketmq/store/CommitLog.java
##########
@@ -785,41 +786,24 @@ public long getBeginTimeInLock() {
}
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
- // Set the storage time
- msg.setStoreTimestamp(System.currentTimeMillis());
- // Set the message body BODY CRC (consider the most appropriate setting
- // on the client)
- msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
+ setIPV6Flags(msg);
+
// Back to Results
AppendMessageResult result = null;
+ try {
+ result = appendPutMessageAndTrackStats(msg);
+ } catch (Exceptions.PutMessageException ex) {
+ return ex.toPutMessageResult();
+ }
+ PutMessageResult putMessageResult = new
PutMessageResult(PutMessageStatus.PUT_OK, result);
- StoreStatsService storeStatsService =
this.defaultMessageStore.getStoreStatsService();
-
- String topic = msg.getTopic();
- int queueId = msg.getQueueId();
-
- final int tranType =
MessageSysFlag.getTransactionValue(msg.getSysFlag());
- if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
- || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
- // Delay Delivery
- if (msg.getDelayTimeLevel() > 0) {
- if (msg.getDelayTimeLevel() >
this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
-
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
- }
-
- topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
- queueId =
ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
-
- // Backup real topic, queueId
- MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
- MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
-
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
+ handleDiskFlush(result, putMessageResult, msg);
+ handleHA(result, putMessageResult, msg);
- msg.setTopic(topic);
- msg.setQueueId(queueId);
- }
- }
+ return putMessageResult;
+ }
+ private void setIPV6Flags(MessageExtBrokerInner msg) {
Review comment:
I dont think so, I just mean that another place could also use the
function instead of writing the same code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]