TerrellChen commented on a change in pull request #2406:
URL: https://github.com/apache/rocketmq/pull/2406#discussion_r520638172
##########
File path:
store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
##########
@@ -507,7 +513,122 @@ public PutMessageResult putMessage(final
MessageExtBrokerInner msg) {
@Override
public PutMessageResult putMessages(final MessageExtBatch messageExtBatch)
{
- return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
+ final int tranType =
MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
+
+ if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null);
+ }
+ if (messageExtBatch.getDelayTimeLevel() > 0) {
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
null);
+ }
+
+ // Set the storage time
+ messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
+
+ StoreStatsService storeStatsService =
this.defaultMessageStore.getStoreStatsService();
+
+ InetSocketAddress bornSocketAddress = (InetSocketAddress)
messageExtBatch.getBornHost();
+ if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+ messageExtBatch.setBornHostV6Flag();
+ }
+
+ InetSocketAddress storeSocketAddress = (InetSocketAddress)
messageExtBatch.getStoreHost();
+ if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+ messageExtBatch.setStoreHostAddressV6Flag();
+ }
+
+ // Back to Results
+ AppendMessageResult appendResult;
+ BatchAppendFuture<AppendEntryResponse> dledgerFuture;
+ EncodeResult encodeResult;
+
+ putMessageLock.lock(); //spin or ReentrantLock ,depending on store
config
+ msgIdBuilder.setLength(0);
+ long elapsedTimeInLock;
+ long queueOffset;
+ long msgNum = 0;
+ try {
+ beginTimeInDledgerLock =
this.defaultMessageStore.getSystemClock().now();
+ encodeResult = this.messageSerializer.serialize(messageExtBatch);
+ queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+ if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+ return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
new AppendMessageResult(encodeResult
+ .status));
+ }
+ BatchAppendEntryRequest request = new BatchAppendEntryRequest();
+ request.setGroup(dLedgerConfig.getGroup());
+ request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+ request.setBatchMsgs(encodeResult.batchData);
+ dledgerFuture = (BatchAppendFuture<AppendEntryResponse>)
dLedgerServer.handleAppend(request);
+ if (dledgerFuture.getPos() == -1) {
+ log.warn("[DEBUG_CTR] handleAppend return false due to error
code {}", dledgerFuture.get().getCode());
+ return new
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
+ }
+ long wroteOffset = 0;
+
+ int msgIdLength = (messageExtBatch.getSysFlag() &
MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+ ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+ for (long pos : dledgerFuture.getPositions()) {
+ wroteOffset = pos + DLedgerEntry.BODY_OFFSET;
+ String msgId = MessageDecoder.createMessageId(buffer,
messageExtBatch.getStoreHostBytes(), wroteOffset);
+ if (msgIdBuilder.length() > 0) {
+ msgIdBuilder.append(',').append(msgId);
+ } else {
+ msgIdBuilder.append(msgId);
+ }
+ msgNum++;
+ }
+ elapsedTimeInLock =
this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
+ appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK,
wroteOffset, encodeResult.totalMsgLen,
+ msgIdBuilder.toString(), System.currentTimeMillis(),
queueOffset, elapsedTimeInLock);
+
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey,
queueOffset + msgNum);
Review comment:
Good job!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]