This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c90c96c1 [INLONG-6535][TubeMQ]The storage space used by the offset of 
TubeMQ broker is too large (#6543)
0c90c96c1 is described below

commit 0c90c96c1b44579f39aedf48895f5f5f9fb182d8
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Nov 15 14:59:13 2022 +0800

    [INLONG-6535][TubeMQ]The storage space used by the offset of TubeMQ broker 
is too large (#6543)
---
 .../tubemq/server/broker/BrokerServiceServer.java  |   6 +-
 .../broker/msgstore/MessageStoreManager.java       |  14 +-
 .../server/broker/msgstore/StoreService.java       |   5 +-
 .../server/broker/offset/DefaultOffsetManager.java |   8 +-
 .../tubemq/server/broker/offset/OffsetCsmItem.java |  46 ++++++
 .../{RecordItem.java => OffsetCsmRecord.java}      |  56 +++++---
 .../server/broker/offset/OffsetHistoryInfo.java    | 159 +++++++++++++++++++++
 .../server/broker/offset/OffsetRecordInfo.java     | 104 --------------
 .../server/broker/offset/OffsetRecordService.java  |   2 +-
 .../tubemq/server/broker/offset/OffsetService.java |   2 +-
 .../tubemq/server/common/TServerConstants.java     |   1 +
 11 files changed, 260 insertions(+), 143 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 616f0d653..47608dbb7 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -64,7 +64,7 @@ import 
org.apache.inlong.tubemq.server.broker.msgstore.MessageStore;
 import org.apache.inlong.tubemq.server.broker.msgstore.MessageStoreManager;
 import org.apache.inlong.tubemq.server.broker.msgstore.disk.GetMessageResult;
 import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.inlong.tubemq.server.broker.offset.OffsetRecordInfo;
+import org.apache.inlong.tubemq.server.broker.offset.OffsetHistoryInfo;
 import org.apache.inlong.tubemq.server.broker.offset.OffsetService;
 import org.apache.inlong.tubemq.server.broker.stats.BrokerSrvStatsHolder;
 import org.apache.inlong.tubemq.server.broker.stats.TrafficStatsService;
@@ -719,7 +719,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
      * @param waitRetryMs  wait duration on overflow
      * @param strBuff    string buffer
      */
-    public void appendGroupOffsetInfo(Map<String, OffsetRecordInfo> 
groupOffsetMap,
+    public void appendGroupOffsetInfo(Map<String, OffsetHistoryInfo> 
groupOffsetMap,
                                       int brokerAddrId, long storeTime, int 
retryCnt,
                                       long waitRetryMs, StringBuilder strBuff) 
{
         if (groupOffsetMap == null || groupOffsetMap.isEmpty()) {
@@ -739,7 +739,7 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
         AppendResult appendResult = new AppendResult();
         // get store time
         String sendTime = DateTimeConvertUtils.ms2yyyyMMddHHmm(storeTime);
-        for (Map.Entry<String, OffsetRecordInfo> entry : 
groupOffsetMap.entrySet()) {
+        for (Map.Entry<String, OffsetHistoryInfo> entry : 
groupOffsetMap.entrySet()) {
             if (entry == null || entry.getKey() == null || entry.getValue() == 
null) {
                 continue;
             }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 14ff6c863..75d4667d0 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -51,8 +51,8 @@ import 
org.apache.inlong.tubemq.server.broker.metadata.MetadataManager;
 import org.apache.inlong.tubemq.server.broker.metadata.TopicMetadata;
 import org.apache.inlong.tubemq.server.broker.msgstore.disk.GetMessageResult;
 import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.inlong.tubemq.server.broker.offset.OffsetRecordInfo;
-import org.apache.inlong.tubemq.server.broker.offset.RecordItem;
+import org.apache.inlong.tubemq.server.broker.offset.OffsetCsmRecord;
+import org.apache.inlong.tubemq.server.broker.offset.OffsetHistoryInfo;
 import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
 import org.apache.inlong.tubemq.server.broker.utils.TopicPubStoreInfo;
 import org.apache.inlong.tubemq.server.common.TStatusConstants;
@@ -462,17 +462,17 @@ public class MessageStoreManager implements StoreService {
      *
      */
     @Override
-    public void getTopicPublishInfos(Map<String, OffsetRecordInfo> 
groupOffsetMap) {
+    public void getTopicPublishInfos(Map<String, OffsetHistoryInfo> 
groupOffsetMap) {
         MessageStore store = null;
         Map<Integer, MessageStore> storeMap;
-        Map<String, Map<Integer, RecordItem>> topicOffsetMap;
-        for (Map.Entry<String, OffsetRecordInfo> entry : 
groupOffsetMap.entrySet()) {
+        Map<String, Map<Integer, OffsetCsmRecord>> topicOffsetMap;
+        for (Map.Entry<String, OffsetHistoryInfo> entry : 
groupOffsetMap.entrySet()) {
             if (entry == null || entry.getKey() == null || entry.getValue() == 
null) {
                 continue;
             }
             topicOffsetMap = entry.getValue().getOffsetMap();
             // Get offset records by topic
-            for (Map.Entry<String, Map<Integer, RecordItem>> entryTopic
+            for (Map.Entry<String, Map<Integer, OffsetCsmRecord>> entryTopic
                     : topicOffsetMap.entrySet()) {
                 if (entryTopic == null
                         || entryTopic.getKey() == null
@@ -484,7 +484,7 @@ public class MessageStoreManager implements StoreService {
                 if (storeMap == null) {
                     continue;
                 }
-                for (Map.Entry<Integer, RecordItem> entryRcd
+                for (Map.Entry<Integer, OffsetCsmRecord> entryRcd
                         : entryTopic.getValue().entrySet()) {
                     store = storeMap.get(entryRcd.getValue().getStoreId());
                     if (store == null) {
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
index c34702c5b..13092a141 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
@@ -21,7 +21,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.inlong.tubemq.server.broker.offset.OffsetRecordInfo;
+
+import org.apache.inlong.tubemq.server.broker.offset.OffsetHistoryInfo;
 import org.apache.inlong.tubemq.server.broker.utils.TopicPubStoreInfo;
 
 /**
@@ -44,5 +45,5 @@ public interface StoreService {
     // Add the current storage offset values to
     //  the consumption partition records of the specified consumption group
     //  include maximum and minimum, and consume lag
-    void getTopicPublishInfos(Map<String, OffsetRecordInfo> groupOffsetMap);
+    void getTopicPublishInfos(Map<String, OffsetHistoryInfo> groupOffsetMap);
 }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
index cb9cdb353..91a8ee785 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -460,10 +460,10 @@ public class DefaultOffsetManager extends 
AbstractDaemonService implements Offse
      * @return group offset info in memory or zk
      */
     @Override
-    public Map<String, OffsetRecordInfo> getOnlineGroupOffsetInfo() {
-        OffsetRecordInfo recordInfo;
+    public Map<String, OffsetHistoryInfo> getOnlineGroupOffsetInfo() {
+        OffsetHistoryInfo recordInfo;
         Map<String, OffsetStorageInfo> storeMap;
-        Map<String, OffsetRecordInfo> result = new HashMap<>();
+        Map<String, OffsetHistoryInfo> result = new HashMap<>();
         for (Map.Entry<String,
                 ConcurrentHashMap<String, OffsetStorageInfo>> entry : 
cfmOffsetMap.entrySet()) {
             if (entry == null || entry.getKey() == null || entry.getValue() == 
null) {
@@ -480,7 +480,7 @@ public class DefaultOffsetManager extends 
AbstractDaemonService implements Offse
                 }
                 recordInfo = result.get(entry.getKey());
                 if (recordInfo == null) {
-                    recordInfo = new OffsetRecordInfo(
+                    recordInfo = new OffsetHistoryInfo(
                             brokerConfig.getBrokerId(), entry.getKey());
                     result.put(entry.getKey(), recordInfo);
                 }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmItem.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmItem.java
new file mode 100644
index 000000000..c028c8e2f
--- /dev/null
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmItem.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.offset;
+
+/**
+ * The offset snapshot of the consumer group on the partition.
+ */
+public class OffsetCsmItem {
+    protected int partitionId;
+    // consume group confirmed offset
+    protected long offsetCfm = 0L;
+    // consume group fetched offset
+    protected long offsetFetch = 0L;
+
+    public OffsetCsmItem(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public void addCsmOffsets(long offsetCfm, long offsetFetch) {
+        this.offsetCfm = offsetCfm;
+        this.offsetFetch = offsetFetch;
+    }
+
+    public void addCfmOffset(long offsetCfm) {
+        this.offsetCfm = offsetCfm;
+    }
+
+    public void addFetchOffset(long offsetFetch) {
+        this.offsetFetch = offsetFetch;
+    }
+}
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/RecordItem.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmRecord.java
similarity index 50%
rename from 
inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/RecordItem.java
rename to 
inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmRecord.java
index ffe9da9a6..7650a57d8 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/RecordItem.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmRecord.java
@@ -17,33 +17,51 @@
 
 package org.apache.inlong.tubemq.server.broker.offset;
 
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * The offset snapshot of the consumer group on the partition.
  */
-public class RecordItem {
+public class OffsetCsmRecord {
     protected int storeId;
-    protected int partitionId;
-    // consume group confirmed offset
-    protected long offsetCfm = 0L;
-    // partition min index offset
+    // store min index offset
     protected long offsetMin = 0L;
-    // partition max index offset
+    // store max index offset
     protected long offsetMax = 0L;
-    // consume lag
-    protected long offsetLag = 0L;
-    // partition min data offset
+    // store min data offset
     protected long dataMin = 0L;
-    // partition max data offset
+    // store max data offset
     protected long dataMax = 0L;
-    // consume data offset
-    protected long dataLag = -1L;
+    // partition consume record
+    protected final Map<Integer, OffsetCsmItem> partitionCsmMap = new 
HashMap<>();
 
-    public RecordItem(int partitionId, long offsetCfm) {
-        this.partitionId = partitionId % TBaseConstants.META_STORE_INS_BASE;
-        this.offsetCfm = offsetCfm;
-        this.storeId = partitionId / TBaseConstants.META_STORE_INS_BASE;
+    public OffsetCsmRecord(int storeId) {
+        this.storeId = storeId;
+    }
+
+    public void addOffsetCfmInfo(int partitionId, long offsetCfm) {
+        OffsetCsmItem offsetCsmItem = partitionCsmMap.get(partitionId);
+        if (offsetCsmItem == null) {
+            OffsetCsmItem tmpItem = new OffsetCsmItem(partitionId);
+            offsetCsmItem = partitionCsmMap.putIfAbsent(partitionId, tmpItem);
+            if (offsetCsmItem == null) {
+                offsetCsmItem = tmpItem;
+            }
+        }
+        offsetCsmItem.addCfmOffset(offsetCfm);
+    }
+
+    public void addOffsetFetchInfo(int partitionId, long offsetFetch) {
+        OffsetCsmItem offsetCsmItem = partitionCsmMap.get(partitionId);
+        if (offsetCsmItem == null) {
+            OffsetCsmItem tmpItem = new OffsetCsmItem(partitionId);
+            offsetCsmItem = partitionCsmMap.putIfAbsent(partitionId, tmpItem);
+            if (offsetCsmItem == null) {
+                offsetCsmItem = tmpItem;
+            }
+        }
+        offsetCsmItem.addFetchOffset(offsetFetch);
     }
 
     public void addStoreInfo(long offsetMin, long offsetMax,
@@ -52,10 +70,6 @@ public class RecordItem {
         this.offsetMax = offsetMax;
         this.dataMin = dataMin;
         this.dataMax = dataMax;
-        if (offsetMax != TBaseConstants.META_VALUE_UNDEFINED
-                && offsetCfm != TBaseConstants.META_VALUE_UNDEFINED) {
-            offsetLag = offsetMax - offsetCfm;
-        }
     }
 
     public int getStoreId() {
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
new file mode 100644
index 000000000..deb1766c4
--- /dev/null
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.offset;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.server.common.TServerConstants;
+
+/**
+ * The offset snapshot of the consumer group on the broker.
+ */
+public class OffsetHistoryInfo {
+    private final int brokerId;
+    private final String groupName;
+    private final Map<String, Map<Integer, OffsetCsmRecord>> histOffsetMap = 
new HashMap<>();
+
+    public OffsetHistoryInfo(int brokerId, String groupName) {
+        this.brokerId = brokerId;
+        this.groupName = groupName;
+    }
+
+    /**
+     * Add confirmed offset of topic-partitionId.
+     *
+     * @param topicName      topic name
+     * @param partitionId    partition id
+     * @param cfmOffset      the confirmed offset
+     */
+    public void addCfmOffsetInfo(String topicName, int partitionId, long 
cfmOffset) {
+        final int storeId = partitionId < TBaseConstants.META_STORE_INS_BASE
+                ? 0 : partitionId / TBaseConstants.META_STORE_INS_BASE;
+        Map<Integer, OffsetCsmRecord> storeOffsetMap = 
histOffsetMap.get(topicName);
+        if (storeOffsetMap == null) {
+            Map<Integer, OffsetCsmRecord> tmpMap = new HashMap<>();
+            storeOffsetMap = histOffsetMap.putIfAbsent(topicName, tmpMap);
+            if (storeOffsetMap == null) {
+                storeOffsetMap = tmpMap;
+            }
+        }
+        OffsetCsmRecord offsetCsmRecord = storeOffsetMap.get(storeId);
+        if (offsetCsmRecord == null) {
+            OffsetCsmRecord tmpRecord = new OffsetCsmRecord(storeId);
+            offsetCsmRecord = storeOffsetMap.putIfAbsent(storeId, tmpRecord);
+            if (offsetCsmRecord == null) {
+                offsetCsmRecord = tmpRecord;
+            }
+        }
+        offsetCsmRecord.addOffsetCfmInfo(partitionId, cfmOffset);
+    }
+
+    /**
+     * Add inflight offset of topic-partitionId.
+     *
+     * @param topicName      topic name
+     * @param partitionId    partition id
+     * @param tmpOffset      the inflight offset
+     */
+    public void addInflightOffsetInfo(String topicName, int partitionId, long 
tmpOffset) {
+        final int storeId = partitionId < TBaseConstants.META_STORE_INS_BASE
+                ? 0 : partitionId / TBaseConstants.META_STORE_INS_BASE;
+        Map<Integer, OffsetCsmRecord> storeOffsetMap = 
histOffsetMap.get(topicName);
+        if (storeOffsetMap == null) {
+            Map<Integer, OffsetCsmRecord> tmpMap = new HashMap<>();
+            storeOffsetMap = histOffsetMap.putIfAbsent(topicName, tmpMap);
+            if (storeOffsetMap == null) {
+                storeOffsetMap = tmpMap;
+            }
+        }
+        OffsetCsmRecord offsetCsmRecord = storeOffsetMap.get(storeId);
+        if (offsetCsmRecord == null) {
+            OffsetCsmRecord tmpRecord = new OffsetCsmRecord(storeId);
+            offsetCsmRecord = storeOffsetMap.putIfAbsent(storeId, tmpRecord);
+            if (offsetCsmRecord == null) {
+                offsetCsmRecord = tmpRecord;
+            }
+        }
+        offsetCsmRecord.addOffsetFetchInfo(partitionId, tmpOffset);
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public Map<String, Map<Integer, OffsetCsmRecord>> getOffsetMap() {
+        return histOffsetMap;
+    }
+
+    /**
+     * Build brief consumption offset information in string format
+     *
+     * @param strBuff     string buffer
+     * @param dataTime    record build time
+     */
+    public void buildRecordInfo(StringBuilder strBuff, long dataTime) {
+        int topicCnt = 0;
+        strBuff.append("{\"dt\":\"")
+                .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(dataTime))
+                .append("\",\"bId\":").append(brokerId)
+                
.append(",\"ver\":").append(TServerConstants.OFFSET_HISTORY_RECORD_SHORT_VERSION)
+                .append(",\"records\":[");
+        for (Map.Entry<String, Map<Integer, OffsetCsmRecord>> entry : 
histOffsetMap.entrySet()) {
+            if (entry == null || entry.getKey() == null || entry.getValue() == 
null) {
+                continue;
+            }
+            if (topicCnt++ > 0) {
+                strBuff.append(",");
+            }
+            int recordCnt = 0;
+            
strBuff.append("{\"topic\":\"").append(entry.getKey()).append("\",\"offsets\":[");
+            Map<Integer, OffsetCsmRecord> csmOffsetRecordMap = 
entry.getValue();
+            for (OffsetCsmRecord offsetRecord : csmOffsetRecordMap.values()) {
+                if (offsetRecord == null) {
+                    continue;
+                }
+                if (recordCnt++ > 0) {
+                    strBuff.append(",");
+                }
+                strBuff.append("{\"storeId\":").append(offsetRecord.storeId)
+                        .append(",\"iMin\":").append(offsetRecord.offsetMin)
+                        .append(",\"iMax\":").append(offsetRecord.offsetMax)
+                        .append(",\"dMin\":").append(offsetRecord.dataMin)
+                        .append(",\"dMax\":").append(offsetRecord.dataMax)
+                        .append(",\"parts\":[");
+                int partCnt = 0;
+                for (OffsetCsmItem csmOffsetItem : 
offsetRecord.partitionCsmMap.values()) {
+                    if (csmOffsetItem == null) {
+                        continue;
+                    }
+                    if (partCnt++ > 0) {
+                        strBuff.append(",");
+                    }
+                    
strBuff.append("{\"partId\":").append(csmOffsetItem.partitionId)
+                            
.append(",\"iCfm\":").append(csmOffsetItem.offsetCfm)
+                            .append("}");
+                }
+                strBuff.append("]}");
+            }
+            strBuff.append("]}");
+        }
+        strBuff.append("]}");
+    }
+}
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordInfo.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordInfo.java
deleted file mode 100644
index ab2bfcdd3..000000000
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordInfo.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.offset;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.inlong.tubemq.corebase.TokenConstants;
-import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
-
-/**
- * The offset snapshot of the consumer group on the broker.
- */
-public class OffsetRecordInfo {
-    private final int brokerId;
-    private final String groupName;
-    private final Map<String, Map<Integer, RecordItem>> histOffsetMap = new 
HashMap<>();
-
-    public OffsetRecordInfo(int brokerId, String groupName) {
-        this.brokerId = brokerId;
-        this.groupName = groupName;
-    }
-
-    /**
-     * Add confirmed offset of topic-partitionId.
-     *
-     * @param topicName      topic name
-     * @param partitionId    partition id
-     * @param cfmOffset      the confirmed offset
-     */
-    public void addCfmOffsetInfo(String topicName, int partitionId, long 
cfmOffset) {
-        Map<Integer, RecordItem> partOffsetMap = histOffsetMap.get(topicName);
-        if (partOffsetMap == null) {
-            Map<Integer, RecordItem> tmpMap = new HashMap<>();
-            partOffsetMap = histOffsetMap.putIfAbsent(topicName, tmpMap);
-            if (partOffsetMap == null) {
-                partOffsetMap = tmpMap;
-            }
-        }
-        partOffsetMap.put(partitionId, new RecordItem(partitionId, cfmOffset));
-    }
-
-    public Map<String, Map<Integer, RecordItem>> getOffsetMap() {
-        return histOffsetMap;
-    }
-
-    /**
-     * Build consumption offset information in string format
-     *
-     * @param strBuff     string buffer
-     * @param dataTime    record build time
-     */
-    public void buildRecordInfo(StringBuilder strBuff, long dataTime) {
-        int topicCnt = 0;
-        
strBuff.append("{\"groupName\":\"").append(groupName).append("\",\"recordTime\":\"")
-                .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(dataTime))
-                .append("\",\"brokerId\":").append(brokerId)
-                
.append(",\"version\":").append(TServerConstants.OFFSET_HISTORY_RECORD_VERSION)
-                .append(",\"records\":[");
-        for (Map.Entry<String, Map<Integer, RecordItem>> entry : 
histOffsetMap.entrySet()) {
-            if (entry == null || entry.getKey() == null || entry.getValue() == 
null) {
-                continue;
-            }
-            if (topicCnt++ > 0) {
-                strBuff.append(",");
-            }
-            int recordCnt = 0;
-            strBuff.append("{\"topicName\":\"").append(entry.getKey())
-                    .append("\",\"offsetInfo\":[");
-            Map<Integer, RecordItem> recordItemMap = entry.getValue();
-            for (RecordItem recordItem : recordItemMap.values()) {
-                if (recordCnt++ > 0) {
-                    strBuff.append(",");
-                }
-                strBuff.append("{\"partitionKey\":\"").append(brokerId)
-                        .append(TokenConstants.ATTR_SEP).append(entry.getKey())
-                        
.append(TokenConstants.ATTR_SEP).append(recordItem.partitionId)
-                        
.append("\",\"offsetCfm\":").append(recordItem.offsetCfm)
-                        .append(",\"offsetMin\":").append(recordItem.offsetMin)
-                        .append(",\"offsetMax\":").append(recordItem.offsetMax)
-                        .append(",\"offsetLag\":").append(recordItem.offsetLag)
-                        .append(",\"dataMin\":").append(recordItem.dataMin)
-                        
.append(",\"dataMax\":").append(recordItem.dataMax).append("}");
-            }
-            strBuff.append("]}");
-        }
-        strBuff.append("]}");
-    }
-}
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
index 16901911f..cbc26c1dc 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
@@ -83,7 +83,7 @@ public class OffsetRecordService extends 
AbstractDaemonService {
             return;
         }
         // get group offset information
-        Map<String, OffsetRecordInfo> groupOffsetMap =
+        Map<String, OffsetHistoryInfo> groupOffsetMap =
                 offsetManager.getOnlineGroupOffsetInfo();
         if (groupOffsetMap == null || groupOffsetMap.isEmpty()) {
             return;
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetService.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetService.java
index b6e342276..1b22163d4 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetService.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetService.java
@@ -68,7 +68,7 @@ public interface OffsetService {
     Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset(
             String group, Map<String, Set<Integer>> topicPartMap);
 
-    Map<String, OffsetRecordInfo> getOnlineGroupOffsetInfo();
+    Map<String, OffsetHistoryInfo> getOnlineGroupOffsetInfo();
 
     boolean modifyGroupOffset(Set<String> groups,
                               List<Tuple3<String, Integer, Long>> 
topicPartOffsets,
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index a7117983a..f67ccb86a 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -109,4 +109,5 @@ public final class TServerConstants {
     public static final int META_MAX_STATSTYPE_LENGTH = 256;
 
     public static final int OFFSET_HISTORY_RECORD_VERSION = 1;
+    public static final int OFFSET_HISTORY_RECORD_SHORT_VERSION = 2;
 }

Reply via email to