This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 845c5fd [ISSUE #1846] Dledger model change into pipeline manner to
improve performance (#1847)
845c5fd is described below
commit 845c5fd61886aff99185a709ebf69d833a38b9c6
Author: rongtong <[email protected]>
AuthorDate: Fri Oct 23 11:55:31 2020 +0800
[ISSUE #1846] Dledger model change into pipeline manner to improve
performance (#1847)
* enhancement(dledger):implement asyncPutMessage in dledger commitlog
* enhancement(dledger):move serialization out of lock
* fix(dledger):fix the issue that queueOffset is overwritten
* fix(dledger):fix the issue that get wrong queueOffset
* test(dledger):add dledgerCommitlog put messages async unit test
* chore(dledger): fix the issue that cannot find symbol of variable
SCHEDULE_TOPIC
---
.../rocketmq/common/message/MessageDecoder.java | 1 +
.../rocketmq/store/dledger/DLedgerCommitLog.java | 274 ++++++++++++++-------
.../store/dledger/DLedgerCommitlogTest.java | 44 ++++
3 files changed, 231 insertions(+), 88 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
index d048dde..7e86d84 100644
---
a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
+++
b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java
@@ -42,6 +42,7 @@ public class MessageDecoder {
public static final char NAME_VALUE_SEPARATOR = 1;
public static final char PROPERTY_SEPARATOR = 2;
public static final int PHY_POS_POSITION = 4 + 4 + 4 + 4 + 4 + 8;
+ public static final int QUEUE_OFFSET_POSITION = 4 + 4 + 4 + 4 + 4;
public static final int SYSFLAG_POSITION = 4 + 4 + 4 + 4 + 4 + 8 + 8;
// public static final int BODY_SIZE_POSITION = 4 // 1 TOTALSIZE
// + 4 // 2 MAGICCODE
diff --git
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 400ad78..24e0f69 100644
---
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -28,6 +28,8 @@ import io.openmessaging.storage.dledger.store.file.MmapFile;
import io.openmessaging.storage.dledger.store.file.MmapFileList;
import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
+import java.net.Inet6Address;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
@@ -364,21 +366,14 @@ public class DLedgerCommitLog extends CommitLog {
return beginTimeInDledgerLock;
}
- @Override
- public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
+ private void setMessageInfo(MessageExtBrokerInner msg, int tranType) {
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
- StoreStatsService storeStatsService =
this.defaultMessageStore.getStoreStatsService();
-
- String topic = msg.getTopic();
- int queueId = msg.getQueueId();
-
//should be consistent with the old version
- final int tranType =
MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
@@ -387,8 +382,9 @@ public class DLedgerCommitLog extends CommitLog {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
- topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
- queueId =
ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
+
+ String topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
+ int queueId =
ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg,
MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
@@ -400,6 +396,25 @@ public class DLedgerCommitLog extends CommitLog {
}
}
+ InetSocketAddress bornSocketAddress = (InetSocketAddress)
msg.getBornHost();
+ if (bornSocketAddress.getAddress() instanceof Inet6Address) {
+ msg.setBornHostV6Flag();
+ }
+
+ InetSocketAddress storeSocketAddress = (InetSocketAddress)
msg.getStoreHost();
+ if (storeSocketAddress.getAddress() instanceof Inet6Address) {
+ msg.setStoreHostAddressV6Flag();
+ }
+ }
+
+ @Override
+ public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
+
+ StoreStatsService storeStatsService =
this.defaultMessageStore.getStoreStatsService();
+ final int tranType =
MessageSysFlag.getTransactionValue(msg.getSysFlag());
+ String topic = msg.getTopic();
+ setMessageInfo(msg,tranType);
+
// Back to Results
AppendMessageResult appendResult;
AppendFuture<AppendEntryResponse> dledgerFuture;
@@ -411,14 +426,15 @@ public class DLedgerCommitLog extends CommitLog {
try {
beginTimeInDledgerLock =
this.defaultMessageStore.getSystemClock().now();
encodeResult = this.messageSerializer.serialize(msg);
- queueOffset = topicQueueTable.get(encodeResult.queueOffsetKey);
+ queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey,
tranType);
+ encodeResult.setQueueOffsetKey(queueOffset);
if (encodeResult.status != AppendMessageStatus.PUT_OK) {
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL,
new AppendMessageResult(encodeResult.status));
}
AppendEntryRequest request = new AppendEntryRequest();
request.setGroup(dLedgerConfig.getGroup());
request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
- request.setBody(encodeResult.data);
+ request.setBody(encodeResult.getData());
dledgerFuture = (AppendFuture<AppendEntryResponse>)
dLedgerServer.handleAppend(request);
if (dledgerFuture.getPos() == -1) {
return new
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR));
@@ -430,7 +446,7 @@ public class DLedgerCommitLog extends CommitLog {
String msgId = MessageDecoder.createMessageId(buffer,
msg.getStoreHostBytes(), wroteOffset);
elapsedTimeInLock =
this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
- appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK,
wroteOffset, encodeResult.data.length, msgId, System.currentTimeMillis(),
queueOffset, elapsedTimeInLock);
+ appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK,
wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(),
queueOffset, elapsedTimeInLock);
switch (tranType) {
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
@@ -496,12 +512,104 @@ public class DLedgerCommitLog extends CommitLog {
@Override
public CompletableFuture<PutMessageResult>
asyncPutMessage(MessageExtBrokerInner msg) {
- return CompletableFuture.completedFuture(this.putMessage(msg));
+
+ StoreStatsService storeStatsService =
this.defaultMessageStore.getStoreStatsService();
+
+ final int tranType =
MessageSysFlag.getTransactionValue(msg.getSysFlag());
+
+ setMessageInfo(msg, tranType);
+
+ final String finalTopic = msg.getTopic();
+
+ // Back to Results
+ AppendMessageResult appendResult;
+ AppendFuture<AppendEntryResponse> dledgerFuture;
+ EncodeResult encodeResult;
+
+ encodeResult = this.messageSerializer.serialize(msg);
+ if (encodeResult.status != AppendMessageStatus.PUT_OK) {
+ return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new
AppendMessageResult(encodeResult.status)));
+ }
+ putMessageLock.lock(); //spin or ReentrantLock ,depending on store
config
+ long elapsedTimeInLock;
+ long queueOffset;
+ try {
+ beginTimeInDledgerLock =
this.defaultMessageStore.getSystemClock().now();
+ queueOffset = getQueueOffsetByKey(encodeResult.queueOffsetKey,
tranType);
+ encodeResult.setQueueOffsetKey(queueOffset);
+ AppendEntryRequest request = new AppendEntryRequest();
+ request.setGroup(dLedgerConfig.getGroup());
+ request.setRemoteId(dLedgerServer.getMemberState().getSelfId());
+ request.setBody(encodeResult.getData());
+ dledgerFuture = (AppendFuture<AppendEntryResponse>)
dLedgerServer.handleAppend(request);
+ if (dledgerFuture.getPos() == -1) {
+ return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ }
+ long wroteOffset = dledgerFuture.getPos() +
DLedgerEntry.BODY_OFFSET;
+
+ int msgIdLength = (msg.getSysFlag() &
MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
+ ByteBuffer buffer = ByteBuffer.allocate(msgIdLength);
+
+ String msgId = MessageDecoder.createMessageId(buffer,
msg.getStoreHostBytes(), wroteOffset);
+ elapsedTimeInLock =
this.defaultMessageStore.getSystemClock().now() - beginTimeInDledgerLock;
+ appendResult = new AppendMessageResult(AppendMessageStatus.PUT_OK,
wroteOffset, encodeResult.getData().length, msgId, System.currentTimeMillis(),
queueOffset, elapsedTimeInLock);
+ switch (tranType) {
+ case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ break;
+ case MessageSysFlag.TRANSACTION_NOT_TYPE:
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ // The next update ConsumeQueue information
+
DLedgerCommitLog.this.topicQueueTable.put(encodeResult.queueOffsetKey,
queueOffset + 1);
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ log.error("Put message error", e);
+ return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR)));
+ } finally {
+ beginTimeInDledgerLock = 0;
+ putMessageLock.unlock();
+ }
+
+ if (elapsedTimeInLock > 500) {
+ log.warn("[NOTIFYME]putMessage in lock cost time(ms)={},
bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length,
appendResult);
+ }
+
+ return dledgerFuture.thenApply(appendEntryResponse -> {
+ PutMessageStatus putMessageStatus = PutMessageStatus.UNKNOWN_ERROR;
+ switch
(DLedgerResponseCode.valueOf(appendEntryResponse.getCode())) {
+ case SUCCESS:
+ putMessageStatus = PutMessageStatus.PUT_OK;
+ break;
+ case INCONSISTENT_LEADER:
+ case NOT_LEADER:
+ case LEADER_NOT_READY:
+ case DISK_FULL:
+ putMessageStatus = PutMessageStatus.SERVICE_NOT_AVAILABLE;
+ break;
+ case WAIT_QUORUM_ACK_TIMEOUT:
+ //Do not return flush_slave_timeout to the client, for the
ons client will ignore it.
+ putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+ break;
+ case LEADER_PENDING_FULL:
+ putMessageStatus = PutMessageStatus.OS_PAGECACHE_BUSY;
+ break;
+ }
+ PutMessageResult putMessageResult = new
PutMessageResult(putMessageStatus, appendResult);
+ if (putMessageStatus == PutMessageStatus.PUT_OK) {
+ // Statistics
+
storeStatsService.getSinglePutMessageTopicTimesTotal(finalTopic).incrementAndGet();
+
storeStatsService.getSinglePutMessageTopicSizeTotal(msg.getTopic()).addAndGet(appendResult.getWroteBytes());
+ }
+ return putMessageResult;
+ });
}
@Override
public CompletableFuture<PutMessageResult>
asyncPutMessages(MessageExtBatch messageExtBatch) {
- return CompletableFuture.completedFuture(putMessages(messageExtBatch));
+ return CompletableFuture.completedFuture(new
PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
}
@Override
@@ -566,51 +674,69 @@ public class DLedgerCommitLog extends CommitLog {
return diff;
}
+ private long getQueueOffsetByKey(String key, int tranType) {
+ Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
+ if (null == queueOffset) {
+ queueOffset = 0L;
+ DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
+ }
+
+ // Transaction messages that require special handling
+ switch (tranType) {
+ // Prepared and Rollback message is not consumed, will not enter
the
+ // consumer queuec
+ case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ queueOffset = 0L;
+ break;
+ case MessageSysFlag.TRANSACTION_NOT_TYPE:
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ default:
+ break;
+ }
+ return queueOffset;
+ }
+
+
class EncodeResult {
private String queueOffsetKey;
- private byte[] data;
+ private ByteBuffer data;
private AppendMessageStatus status;
- public EncodeResult(AppendMessageStatus status, byte[] data, String
queueOffsetKey) {
+ public EncodeResult(AppendMessageStatus status, ByteBuffer data,
String queueOffsetKey) {
this.data = data;
this.status = status;
this.queueOffsetKey = queueOffsetKey;
}
+
+ public void setQueueOffsetKey(long offset) {
+ data.putLong(MessageDecoder.QUEUE_OFFSET_POSITION, offset);
+ }
+
+ public byte[] getData() {
+ return data.array();
+ }
}
class MessageSerializer {
- // File at the end of the minimum fixed length empty
- private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
- private final ByteBuffer msgIdMemory;
- private final ByteBuffer msgIdV6Memory;
- // Store the message content
- private final ByteBuffer msgStoreItemMemory;
+
// The maximum length of the message
private final int maxMessageSize;
// Build Message Key
private final StringBuilder keyBuilder = new StringBuilder();
- private final StringBuilder msgIdBuilder = new StringBuilder();
-
-// private final ByteBuffer hostHolder = ByteBuffer.allocate(8);
-
MessageSerializer(final int size) {
- this.msgIdMemory = ByteBuffer.allocate(4 + 4 + 8);
- this.msgIdV6Memory = ByteBuffer.allocate(16 + 4 + 8);
- this.msgStoreItemMemory = ByteBuffer.allocate(size +
END_FILE_MIN_BLANK_LENGTH);
this.maxMessageSize = size;
}
- public ByteBuffer getMsgStoreItemMemory() {
- return msgStoreItemMemory;
- }
-
public EncodeResult serialize(final MessageExtBrokerInner msgInner) {
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long wroteOffset = 0;
+ long queueOffset = 0;
+
int sysflag = msgInner.getSysFlag();
int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG)
== 0 ? 4 + 4 : 16 + 4;
@@ -618,33 +744,7 @@ public class DLedgerCommitLog extends CommitLog {
ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);
- // Record ConsumeQueue information
- keyBuilder.setLength(0);
- keyBuilder.append(msgInner.getTopic());
- keyBuilder.append('-');
- keyBuilder.append(msgInner.getQueueId());
- String key = keyBuilder.toString();
-
- Long queueOffset = DLedgerCommitLog.this.topicQueueTable.get(key);
- if (null == queueOffset) {
- queueOffset = 0L;
- DLedgerCommitLog.this.topicQueueTable.put(key, queueOffset);
- }
-
- // Transaction messages that require special handling
- final int tranType =
MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
- switch (tranType) {
- // Prepared and Rollback message is not consumed, will not
enter the
- // consumer queuec
- case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
- case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
- queueOffset = 0L;
- break;
- case MessageSysFlag.TRANSACTION_NOT_TYPE:
- case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
- default:
- break;
- }
+ String key = msgInner.getTopic() + "-" + msgInner.getQueueId();
/**
* Serialize message
@@ -666,6 +766,8 @@ public class DLedgerCommitLog extends CommitLog {
final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength,
topicLength, propertiesLength);
+ ByteBuffer msgStoreItemMemory = ByteBuffer.allocate(msgLen);
+
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
DLedgerCommitLog.log.warn("message size exceeded, msg total
size: " + msgLen + ", msg body size: " + bodyLength
@@ -675,60 +777,56 @@ public class DLedgerCommitLog extends CommitLog {
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
- this.msgStoreItemMemory.putInt(msgLen);
+ msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
-
this.msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);
+ msgStoreItemMemory.putInt(DLedgerCommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
- this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
+ msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
- this.msgStoreItemMemory.putInt(msgInner.getQueueId());
+ msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
- this.msgStoreItemMemory.putInt(msgInner.getFlag());
+ msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
- this.msgStoreItemMemory.putLong(queueOffset);
+ msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
- this.msgStoreItemMemory.putLong(wroteOffset);
+ msgStoreItemMemory.putLong(wroteOffset);
// 8 SYSFLAG
- this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
+ msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
- this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
+ msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
- this.resetByteBuffer(bornHostHolder, bornHostLength);
-
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
+ resetByteBuffer(bornHostHolder, bornHostLength);
+ msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
// 11 STORETIMESTAMP
- this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
+ msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
- this.resetByteBuffer(storeHostHolder, storeHostLength);
-
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
+ resetByteBuffer(storeHostHolder, storeHostLength);
+
msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
- this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
+ msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
-
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
+
msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
- this.msgStoreItemMemory.putInt(bodyLength);
+ msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0) {
- this.msgStoreItemMemory.put(msgInner.getBody());
+ msgStoreItemMemory.put(msgInner.getBody());
}
// 16 TOPIC
- this.msgStoreItemMemory.put((byte) topicLength);
- this.msgStoreItemMemory.put(topicData);
+ msgStoreItemMemory.put((byte) topicLength);
+ msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
- this.msgStoreItemMemory.putShort((short) propertiesLength);
+ msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0) {
- this.msgStoreItemMemory.put(propertiesData);
+ msgStoreItemMemory.put(propertiesData);
}
- byte[] data = new byte[msgLen];
- this.msgStoreItemMemory.clear();
- this.msgStoreItemMemory.get(data);
- return new EncodeResult(AppendMessageStatus.PUT_OK, data, key);
+ return new EncodeResult(AppendMessageStatus.PUT_OK,
msgStoreItemMemory, key);
}
private void resetByteBuffer(final ByteBuffer byteBuffer, final int
limit) {
byteBuffer.flip();
byteBuffer.limit(limit);
}
-
}
public static class DLedgerSelectMappedBufferResult extends
SelectMappedBufferResult {
diff --git
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index f0b9205..e31d834 100644
---
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -175,6 +177,48 @@ public class DLedgerCommitlogTest extends
MessageStoreTestBase {
messageStore.shutdown();
}
+ @Test
+ public void testAsyncPutAndGetMessage() throws Exception {
+ String base = createBaseDir();
+ String peers = String.format("n0-localhost:%d", nextPort());
+ String group = UUID.randomUUID().toString();
+ DefaultMessageStore messageStore = createDledgerMessageStore(base,
group, "n0", peers, null, false, 0);
+ Thread.sleep(1000);
+ String topic = UUID.randomUUID().toString();
+
+ List<PutMessageResult> results = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ MessageExtBrokerInner msgInner =
+ i < 5 ? buildMessage() : buildIPv6HostMessage();
+ msgInner.setTopic(topic);
+ msgInner.setQueueId(0);
+ CompletableFuture<PutMessageResult> futureResult =
messageStore.asyncPutMessage(msgInner);
+ PutMessageResult putMessageResult = futureResult.get(3000,
TimeUnit.MILLISECONDS);
+ results.add(putMessageResult);
+ Assert.assertEquals(PutMessageStatus.PUT_OK,
putMessageResult.getPutMessageStatus());
+ Assert.assertEquals(i,
putMessageResult.getAppendMessageResult().getLogicsOffset());
+ }
+ Thread.sleep(100);
+ Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 0));
+ Assert.assertEquals(10, messageStore.getMaxOffsetInQueue(topic, 0));
+ Assert.assertEquals(0, messageStore.dispatchBehindBytes());
+ GetMessageResult getMessageResult = messageStore.getMessage("group",
topic, 0, 0, 32, null);
+ Assert.assertEquals(GetMessageStatus.FOUND,
getMessageResult.getStatus());
+
+ Assert.assertEquals(10,
getMessageResult.getMessageBufferList().size());
+ Assert.assertEquals(10, getMessageResult.getMessageMapedList().size());
+
+ for (int i = 0; i < results.size(); i++) {
+ ByteBuffer buffer = getMessageResult.getMessageBufferList().get(i);
+ MessageExt messageExt = MessageDecoder.decode(buffer);
+ Assert.assertEquals(i, messageExt.getQueueOffset());
+
Assert.assertEquals(results.get(i).getAppendMessageResult().getMsgId(),
messageExt.getMsgId());
+
Assert.assertEquals(results.get(i).getAppendMessageResult().getWroteOffset(),
messageExt.getCommitLogOffset());
+ }
+ messageStore.destroy();
+ messageStore.shutdown();
+ }
+
@Test
public void testCommittedPos() throws Exception {