This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push:
new 99bb415db4 [ISSUE #7740] Optimize LocalFileOffsetStore (#7745)
99bb415db4 is described below
commit 99bb415db46796a16205a6747f0627cc88178835
Author: Liu Shengzhong <[email protected]>
AuthorDate: Mon Jan 29 08:50:43 2024 +0800
[ISSUE #7740] Optimize LocalFileOffsetStore (#7745)
* Fix LocalFileOffsetStore persistAll and persist
* Fix LocalFileOffsetStore removeOffset
* Add test case
---
.../consumer/store/LocalFileOffsetStore.java | 45 ++++++++++++++++++++--
.../consumer/store/LocalFileOffsetStoreTest.java | 45 ++++++++++++++++++++++
2 files changed, 87 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
index f949b75a81..0fbe8010db 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStore.java
@@ -131,10 +131,20 @@ public class LocalFileOffsetStore implements OffsetStore {
@Override
public void persistAll(Set<MessageQueue> mqs) {
- if (null == mqs || mqs.isEmpty())
+ if (null == mqs || mqs.isEmpty()) {
return;
+ }
+ OffsetSerializeWrapper offsetSerializeWrapper = null;
+ try {
+ offsetSerializeWrapper = readLocalOffset();
+ } catch (MQClientException e) {
+ log.error("readLocalOffset exception", e);
+ return;
+ }
- OffsetSerializeWrapper offsetSerializeWrapper = new
OffsetSerializeWrapper();
+ if (offsetSerializeWrapper == null) {
+ offsetSerializeWrapper = new OffsetSerializeWrapper();
+ }
for (Map.Entry<MessageQueue, AtomicLong> entry :
this.offsetTable.entrySet()) {
if (mqs.contains(entry.getKey())) {
AtomicLong offset = entry.getValue();
@@ -154,11 +164,40 @@ public class LocalFileOffsetStore implements OffsetStore {
@Override
public void persist(MessageQueue mq) {
+ if (mq == null) {
+ return;
+ }
+ AtomicLong offset = this.offsetTable.get(mq);
+ if (offset != null) {
+ OffsetSerializeWrapper offsetSerializeWrapper = null;
+ try {
+ offsetSerializeWrapper = readLocalOffset();
+ } catch (MQClientException e) {
+ log.error("readLocalOffset exception", e);
+ return;
+ }
+ if (offsetSerializeWrapper == null) {
+ offsetSerializeWrapper = new OffsetSerializeWrapper();
+ }
+ offsetSerializeWrapper.getOffsetTable().put(mq, offset);
+ String jsonString = offsetSerializeWrapper.toJson(true);
+ if (jsonString != null) {
+ try {
+ MixAll.string2File(jsonString, this.storePath);
+ } catch (IOException e) {
+ log.error("persist consumer offset exception, " +
this.storePath, e);
+ }
+ }
+ }
}
@Override
public void removeOffset(MessageQueue mq) {
-
+ if (mq != null) {
+ this.offsetTable.remove(mq);
+ log.info("remove unnecessary messageQueue offset. group={}, mq={},
offsetTableSize={}", this.groupName, mq,
+ offsetTable.size());
+ }
}
@Override
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
index a705b30fc3..e3f2cc070d 100644
---
a/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/store/LocalFileOffsetStoreTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.client.consumer.store;
import java.io.File;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
@@ -85,4 +86,48 @@ public class LocalFileOffsetStoreTest {
assertThat(cloneOffsetTable.size()).isEqualTo(1);
assertThat(cloneOffsetTable.get(messageQueue)).isEqualTo(1024);
}
+
+ @Test
+ public void testPersist() throws Exception {
+ OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory,
group);
+
+ MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
+ offsetStore.updateOffset(messageQueue0, 1024, false);
+ offsetStore.persist(messageQueue0);
+ assertThat(offsetStore.readOffset(messageQueue0,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+
+ MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
+ assertThat(offsetStore.readOffset(messageQueue1,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(-1);
+ }
+
+ @Test
+ public void testPersistAll() throws Exception {
+ OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory,
group);
+
+ MessageQueue messageQueue0 = new MessageQueue(topic, brokerName, 0);
+ offsetStore.updateOffset(messageQueue0, 1024, false);
+ offsetStore.persistAll(new
HashSet<MessageQueue>(Collections.singletonList(messageQueue0)));
+ assertThat(offsetStore.readOffset(messageQueue0,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+
+ MessageQueue messageQueue1 = new MessageQueue(topic, brokerName, 1);
+ MessageQueue messageQueue2 = new MessageQueue(topic, brokerName, 2);
+ offsetStore.updateOffset(messageQueue1, 1025, false);
+ offsetStore.updateOffset(messageQueue2, 1026, false);
+ offsetStore.persistAll(new
HashSet<MessageQueue>(Arrays.asList(messageQueue1, messageQueue2)));
+
+ assertThat(offsetStore.readOffset(messageQueue0,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1024);
+ assertThat(offsetStore.readOffset(messageQueue1,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1025);
+ assertThat(offsetStore.readOffset(messageQueue2,
ReadOffsetType.READ_FROM_STORE)).isEqualTo(1026);
+ }
+
+ @Test
+ public void testRemoveOffset() throws Exception {
+ OffsetStore offsetStore = new LocalFileOffsetStore(mQClientFactory,
group);
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName, 0);
+ offsetStore.updateOffset(messageQueue, 1024, false);
+ assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(1024);
+
+ offsetStore.removeOffset(messageQueue);
+ assertThat(offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_MEMORY)).isEqualTo(-1);
+ }
}
\ No newline at end of file