This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ae7179d75e [ISSUE #8765] fix low performance of delay message when
enable rocksdb consume queue (#8766)
ae7179d75e is described below
commit ae7179d75e11f469d68be05fbf556fde42c8a795
Author: yuz10 <[email protected]>
AuthorDate: Wed Nov 20 11:10:01 2024 +0800
[ISSUE #8765] fix low performance of delay message when enable rocksdb
consume queue (#8766)
* #7538 fix wrong cachedMsgSize if msg body is changed in consumer callback
* [ISSUE #8765] fix low performance of delay message when enable rocksdb
consume queue
* remove prefetch
---
.../rocketmq/store/queue/RocksDBConsumeQueue.java | 74 ++++++++++++++++++----
.../store/queue/RocksDBConsumeQueueTest.java | 73 +++++++++++++++++++++
2 files changed, 136 insertions(+), 11 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 83ba7bebad..7bd3c2e305 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -271,22 +271,17 @@ public class RocksDBConsumeQueue implements
ConsumeQueueInterface {
private int pullNum(long cqOffset, long maxCqOffset) {
long diffLong = maxCqOffset - cqOffset;
if (diffLong < Integer.MAX_VALUE) {
- int diffInt = (int) diffLong;
- return diffInt > 16 ? 16 : diffInt;
+ return (int) diffLong;
}
- return 16;
+ return Integer.MAX_VALUE;
}
@Override
public ReferredIterator<CqUnit> iterateFrom(final long startIndex) {
- try {
- long maxCqOffset = getMaxOffsetInQueue();
- if (startIndex < maxCqOffset) {
- int num = pullNum(startIndex, maxCqOffset);
- return iterateFrom0(startIndex, num);
- }
- } catch (RocksDBException e) {
- log.error("[RocksDBConsumeQueue] iterateFrom error!", e);
+ long maxCqOffset = getMaxOffsetInQueue();
+ if (startIndex < maxCqOffset) {
+ int num = pullNum(startIndex, maxCqOffset);
+ return new LargeRocksDBConsumeQueueIterator(startIndex, num);
}
return null;
}
@@ -428,4 +423,61 @@ public class RocksDBConsumeQueue implements
ConsumeQueueInterface {
}
}
}
+
+ private class LargeRocksDBConsumeQueueIterator implements
ReferredIterator<CqUnit> {
+ private final long startIndex;
+ private final int totalCount;
+ private int currentIndex;
+
+ public LargeRocksDBConsumeQueueIterator(final long startIndex, final
int num) {
+ this.startIndex = startIndex;
+ this.totalCount = num;
+ this.currentIndex = 0;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.currentIndex < this.totalCount;
+ }
+
+
+ @Override
+ public CqUnit next() {
+ if (!hasNext()) {
+ return null;
+ }
+
+ final ByteBuffer byteBuffer;
+ try {
+ byteBuffer = messageStore.getQueueStore().get(topic, queueId,
startIndex + currentIndex);
+ } catch (RocksDBException e) {
+ ERROR_LOG.error("get cq from rocksdb failed. topic: {},
queueId: {}", topic, queueId, e);
+ return null;
+ }
+ if (byteBuffer == null || byteBuffer.remaining() <
RocksDBConsumeQueueTable.CQ_UNIT_SIZE) {
+ return null;
+ }
+ CqUnit cqUnit = new CqUnit(this.startIndex + currentIndex,
byteBuffer.getLong(), byteBuffer.getInt(), byteBuffer.getLong());
+ this.currentIndex++;
+ return cqUnit;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+
+ @Override
+ public void release() {
+ }
+
+ @Override
+ public CqUnit nextAndRelease() {
+ try {
+ return next();
+ } finally {
+ release();
+ }
+ }
+ }
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTest.java
b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTest.java
new file mode 100644
index 0000000000..b907ce5951
--- /dev/null
+++
b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.queue;
+
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.nio.ByteBuffer;
+
+import static
org.apache.rocketmq.store.queue.RocksDBConsumeQueueTable.CQ_UNIT_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RocksDBConsumeQueueTest extends QueueTestBase {
+
+ @Test
+ public void testIterator() throws Exception {
+ if (MixAll.isMac()) {
+ return;
+ }
+ DefaultMessageStore messageStore = mock(DefaultMessageStore.class);
+ RocksDBConsumeQueueStore rocksDBConsumeQueueStore =
mock(RocksDBConsumeQueueStore.class);
+
when(messageStore.getQueueStore()).thenReturn(rocksDBConsumeQueueStore);
+ when(rocksDBConsumeQueueStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(10000L);
+ when(rocksDBConsumeQueueStore.get(anyString(), anyInt(),
anyLong())).then(new Answer<ByteBuffer>() {
+ @Override
+ public ByteBuffer answer(InvocationOnMock mock) throws Throwable {
+ long startIndex = mock.getArgument(2);
+ final ByteBuffer byteBuffer =
ByteBuffer.allocate(CQ_UNIT_SIZE);
+ long phyOffset = startIndex * 10;
+ byteBuffer.putLong(phyOffset);
+ byteBuffer.putInt(1);
+ byteBuffer.putLong(0);
+ byteBuffer.putLong(0);
+ byteBuffer.flip();
+ return byteBuffer;
+ }
+ });
+
+ RocksDBConsumeQueue consumeQueue = new
RocksDBConsumeQueue(messageStore, "topic", 0);
+ ReferredIterator<CqUnit> it = consumeQueue.iterateFrom(9000);
+ for (int i = 0; i < 1000; i++) {
+ assertTrue(it.hasNext());
+ CqUnit next = it.next();
+ assertEquals(9000 + i, next.getQueueOffset());
+ assertEquals(10 * (9000 + i), next.getPos());
+ }
+ assertFalse(it.hasNext());
+ }
+}
\ No newline at end of file