This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d38558800c [ISSUE #7740] Optimize LocalFileOffsetStore
d38558800c is described below
commit d38558800c184ad34030388afec54715ad6784a8
Author: Liu Shengzhong <[email protected]>
AuthorDate: Tue Feb 6 09:20:02 2024 +0800
[ISSUE #7740] Optimize LocalFileOffsetStore
---
.../consumer/store/LocalFileOffsetStore.java | 45 +++++++++++++++++++--
.../consumer/store/LocalFileOffsetStoreTest.java | 47 +++++++++++++++++++++-
2 files changed, 88 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index 074508c46b..38b0a5be35 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -138,10 +138,20 @@ public class LocalFileOffsetStore implements OffsetStore {
@Override
public void persistAll(Set<MessageQueue> mqs) {
- if (null == mqs || mqs.isEmpty())
+ if (null == mqs || mqs.isEmpty()) {
return;
+ }
+ OffsetSerializeWrapper offsetSerializeWrapper = null;
+ try {
+ offsetSerializeWrapper = readLocalOffset();
+ } catch (MQClientException e) {
+ log.error("readLocalOffset exception", e);
+ return;
+ }
- OffsetSerializeWrapper offsetSerializeWrapper = new
OffsetSerializeWrapper();
+ if (offsetSerializeWrapper == null) {
+ offsetSerializeWrapper = new OffsetSerializeWrapper();
+ }
for (Map.Entry<MessageQueue, ControllableOffset> entry :
this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = new
AtomicLong(entry.getValue().getOffset());
@@ -161,11 +171,40 @@ public class LocalFileOffsetStore implements OffsetStore {
@Override
public void persist(MessageQueue mq) {
+ if (mq == null) {
+ return;
+ }
+ ControllableOffset offset = this.offsetTable.get(mq);
+ if (offset != null) {
+ OffsetSerializeWrapper offsetSerializeWrapper = null;
+ try {
+ offsetSerializeWrapper = readLocalOffset();
+ } catch (MQClientException e) {
+ log.error("readLocalOffset exception", e);
+ return;
+ }
+ if (offsetSerializeWrapper == null) {
+ offsetSerializeWrapper = new OffsetSerializeWrapper();
+ }
+ offsetSerializeWrapper.getOffsetTable().put(mq, new
AtomicLong(offset.getOffset()));
+ String jsonString = offsetSerializeWrapper.toJson(true);
+ if (jsonString != null) {
+ try {
+ MixAll.string2File(jsonString, this.storePath);
+ } catch (IOException e) {
+ log.error("persist consumer offset exception, " +
this.storePath, e);
+ }
+ }
+ }
}
@Override
public void removeOffset(MessageQueue mq) {
-
+ if (mq != null) {
+ this.offsetTable.remove(mq);
+ log.info("remove unnecessary messageQueue offset. group={}, mq={},
offsetTableSize={}", this.groupName, mq,
+ offsetTable.size());
+ }
}
@Override
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
index c31c708dbb..2f88523bc1 100644
---
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.consumer.store;
import java.io.File;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
@@ -85,4 +86,48 @@ public class LocalFileOffsetStoreTest {
assertThat(cloneOffsetTable.size()).isEqualTo(1);
assertThat(cloneOffsetTable.get(messageQueue)).isEqualTo(1024);
}
-}
\ No newline at end of file
+
+ @Test
+ public void testPersist() throws Exception {
+ OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory,
group);
+
+ MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
+ offsetStore.updateOffset(messageQueue0, 1024, false);
+ offsetStore.persist(messageQueue0);
+ assertThat(offsetStore.readOffset(messageQueue0,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+
+ MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
+ assertThat(offsetStore.readOffset(messageQueue1,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
+ }
+
+ @Test
+ public void testPersistAll() throws Exception {
+ OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory,
group);
+
+ MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
+ offsetStore.updateOffset(messageQueue0, 1024, false);
+ offsetStore.persistAll(new
HashSet<MessageQueue>(Collections.singletonList(messageQueue0)));
+ assertThat(offsetStore.readOffset(messageQueue0,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+
+ MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
+ MessageQueue messageQueue2 = new MessageQueue(topic, brokerName, 2);
+ offsetStore.updateOffset(messageQueue1, 1025, false);
+ offsetStore.updateOffset(messageQueue2, 1026, false);
+ offsetStore.persistAll(new
HashSet<MessageQueue>(Arrays.asList(messageQueue1, messageQueue2)));
+
+ assertThat(offsetStore.readOffset(messageQueue0,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+ assertThat(offsetStore.readOffset(messageQueue1,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025);
+ assertThat(offsetStore.readOffset(messageQueue2,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1026);
+ }
+
+ @Test
+ public void testRemoveOffset() throws Exception {
+ OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory,
group);
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
+ offsetStore.updateOffset(messageQueue, 1024, false);
+ assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+ offsetStore.removeOffset(messageQueue);
+ assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1);
+ }
+}