This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0c90c96c1 [INLONG-6535][TubeMQ]The storage space used by the offset of
TubeMQ broker is too large (#6543)
0c90c96c1 is described below
commit 0c90c96c1b44579f39aedf48895f5f5f9fb182d8
Author: Goson Zhang <[email protected]>
AuthorDate: Tue Nov 15 14:59:13 2022 +0800
[INLONG-6535][TubeMQ]The storage space used by the offset of TubeMQ broker
is too large (#6543)
---
.../tubemq/server/broker/BrokerServiceServer.java | 6 +-
.../broker/msgstore/MessageStoreManager.java | 14 +-
.../server/broker/msgstore/StoreService.java | 5 +-
.../server/broker/offset/DefaultOffsetManager.java | 8 +-
.../tubemq/server/broker/offset/OffsetCsmItem.java | 46 ++++++
.../{RecordItem.java => OffsetCsmRecord.java} | 56 +++++---
.../server/broker/offset/OffsetHistoryInfo.java | 159 +++++++++++++++++++++
.../server/broker/offset/OffsetRecordInfo.java | 104 --------------
.../server/broker/offset/OffsetRecordService.java | 2 +-
.../tubemq/server/broker/offset/OffsetService.java | 2 +-
.../tubemq/server/common/TServerConstants.java | 1 +
11 files changed, 260 insertions(+), 143 deletions(-)
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 616f0d653..47608dbb7 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -64,7 +64,7 @@ import
org.apache.inlong.tubemq.server.broker.msgstore.MessageStore;
import org.apache.inlong.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.inlong.tubemq.server.broker.msgstore.disk.GetMessageResult;
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.inlong.tubemq.server.broker.offset.OffsetRecordInfo;
+import org.apache.inlong.tubemq.server.broker.offset.OffsetHistoryInfo;
import org.apache.inlong.tubemq.server.broker.offset.OffsetService;
import org.apache.inlong.tubemq.server.broker.stats.BrokerSrvStatsHolder;
import org.apache.inlong.tubemq.server.broker.stats.TrafficStatsService;
@@ -719,7 +719,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
* @param waitRetryMs wait duration on overflow
* @param strBuff string buffer
*/
- public void appendGroupOffsetInfo(Map<String, OffsetRecordInfo>
groupOffsetMap,
+ public void appendGroupOffsetInfo(Map<String, OffsetHistoryInfo>
groupOffsetMap,
int brokerAddrId, long storeTime, int
retryCnt,
long waitRetryMs, StringBuilder strBuff)
{
if (groupOffsetMap == null || groupOffsetMap.isEmpty()) {
@@ -739,7 +739,7 @@ public class BrokerServiceServer implements
BrokerReadService, BrokerWriteServic
AppendResult appendResult = new AppendResult();
// get store time
String sendTime = DateTimeConvertUtils.ms2yyyyMMddHHmm(storeTime);
- for (Map.Entry<String, OffsetRecordInfo> entry :
groupOffsetMap.entrySet()) {
+ for (Map.Entry<String, OffsetHistoryInfo> entry :
groupOffsetMap.entrySet()) {
if (entry == null || entry.getKey() == null || entry.getValue() ==
null) {
continue;
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 14ff6c863..75d4667d0 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -51,8 +51,8 @@ import
org.apache.inlong.tubemq.server.broker.metadata.MetadataManager;
import org.apache.inlong.tubemq.server.broker.metadata.TopicMetadata;
import org.apache.inlong.tubemq.server.broker.msgstore.disk.GetMessageResult;
import org.apache.inlong.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
-import org.apache.inlong.tubemq.server.broker.offset.OffsetRecordInfo;
-import org.apache.inlong.tubemq.server.broker.offset.RecordItem;
+import org.apache.inlong.tubemq.server.broker.offset.OffsetCsmRecord;
+import org.apache.inlong.tubemq.server.broker.offset.OffsetHistoryInfo;
import org.apache.inlong.tubemq.server.broker.utils.DataStoreUtils;
import org.apache.inlong.tubemq.server.broker.utils.TopicPubStoreInfo;
import org.apache.inlong.tubemq.server.common.TStatusConstants;
@@ -462,17 +462,17 @@ public class MessageStoreManager implements StoreService {
*
*/
@Override
- public void getTopicPublishInfos(Map<String, OffsetRecordInfo>
groupOffsetMap) {
+ public void getTopicPublishInfos(Map<String, OffsetHistoryInfo>
groupOffsetMap) {
MessageStore store = null;
Map<Integer, MessageStore> storeMap;
- Map<String, Map<Integer, RecordItem>> topicOffsetMap;
- for (Map.Entry<String, OffsetRecordInfo> entry :
groupOffsetMap.entrySet()) {
+ Map<String, Map<Integer, OffsetCsmRecord>> topicOffsetMap;
+ for (Map.Entry<String, OffsetHistoryInfo> entry :
groupOffsetMap.entrySet()) {
if (entry == null || entry.getKey() == null || entry.getValue() ==
null) {
continue;
}
topicOffsetMap = entry.getValue().getOffsetMap();
// Get offset records by topic
- for (Map.Entry<String, Map<Integer, RecordItem>> entryTopic
+ for (Map.Entry<String, Map<Integer, OffsetCsmRecord>> entryTopic
: topicOffsetMap.entrySet()) {
if (entryTopic == null
|| entryTopic.getKey() == null
@@ -484,7 +484,7 @@ public class MessageStoreManager implements StoreService {
if (storeMap == null) {
continue;
}
- for (Map.Entry<Integer, RecordItem> entryRcd
+ for (Map.Entry<Integer, OffsetCsmRecord> entryRcd
: entryTopic.getValue().entrySet()) {
store = storeMap.get(entryRcd.getValue().getStoreId());
if (store == null) {
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
index c34702c5b..13092a141 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/StoreService.java
@@ -21,7 +21,8 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.apache.inlong.tubemq.server.broker.offset.OffsetRecordInfo;
+
+import org.apache.inlong.tubemq.server.broker.offset.OffsetHistoryInfo;
import org.apache.inlong.tubemq.server.broker.utils.TopicPubStoreInfo;
/**
@@ -44,5 +45,5 @@ public interface StoreService {
// Add the current storage offset values to
// the consumption partition records of the specified consumption group
// include maximum and minimum, and consume lag
- void getTopicPublishInfos(Map<String, OffsetRecordInfo> groupOffsetMap);
+ void getTopicPublishInfos(Map<String, OffsetHistoryInfo> groupOffsetMap);
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
index cb9cdb353..91a8ee785 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/DefaultOffsetManager.java
@@ -460,10 +460,10 @@ public class DefaultOffsetManager extends
AbstractDaemonService implements Offse
* @return group offset info in memory or zk
*/
@Override
- public Map<String, OffsetRecordInfo> getOnlineGroupOffsetInfo() {
- OffsetRecordInfo recordInfo;
+ public Map<String, OffsetHistoryInfo> getOnlineGroupOffsetInfo() {
+ OffsetHistoryInfo recordInfo;
Map<String, OffsetStorageInfo> storeMap;
- Map<String, OffsetRecordInfo> result = new HashMap<>();
+ Map<String, OffsetHistoryInfo> result = new HashMap<>();
for (Map.Entry<String,
ConcurrentHashMap<String, OffsetStorageInfo>> entry :
cfmOffsetMap.entrySet()) {
if (entry == null || entry.getKey() == null || entry.getValue() ==
null) {
@@ -480,7 +480,7 @@ public class DefaultOffsetManager extends
AbstractDaemonService implements Offse
}
recordInfo = result.get(entry.getKey());
if (recordInfo == null) {
- recordInfo = new OffsetRecordInfo(
+ recordInfo = new OffsetHistoryInfo(
brokerConfig.getBrokerId(), entry.getKey());
result.put(entry.getKey(), recordInfo);
}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmItem.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmItem.java
new file mode 100644
index 000000000..c028c8e2f
--- /dev/null
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmItem.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.offset;
+
+/**
+ * The offset snapshot of the consumer group on the partition.
+ */
+public class OffsetCsmItem {
+ protected int partitionId;
+ // consume group confirmed offset
+ protected long offsetCfm = 0L;
+ // consume group fetched offset
+ protected long offsetFetch = 0L;
+
+ public OffsetCsmItem(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public void addCsmOffsets(long offsetCfm, long offsetFetch) {
+ this.offsetCfm = offsetCfm;
+ this.offsetFetch = offsetFetch;
+ }
+
+ public void addCfmOffset(long offsetCfm) {
+ this.offsetCfm = offsetCfm;
+ }
+
+ public void addFetchOffset(long offsetFetch) {
+ this.offsetFetch = offsetFetch;
+ }
+}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/RecordItem.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmRecord.java
similarity index 50%
rename from
inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/RecordItem.java
rename to
inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmRecord.java
index ffe9da9a6..7650a57d8 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/RecordItem.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetCsmRecord.java
@@ -17,33 +17,51 @@
package org.apache.inlong.tubemq.server.broker.offset;
-import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import java.util.HashMap;
+import java.util.Map;
/**
* The offset snapshot of the consumer group on the partition.
*/
-public class RecordItem {
+public class OffsetCsmRecord {
protected int storeId;
- protected int partitionId;
- // consume group confirmed offset
- protected long offsetCfm = 0L;
- // partition min index offset
+ // store min index offset
protected long offsetMin = 0L;
- // partition max index offset
+ // store max index offset
protected long offsetMax = 0L;
- // consume lag
- protected long offsetLag = 0L;
- // partition min data offset
+ // store min data offset
protected long dataMin = 0L;
- // partition max data offset
+ // store max data offset
protected long dataMax = 0L;
- // consume data offset
- protected long dataLag = -1L;
+ // partition consume record
+ protected final Map<Integer, OffsetCsmItem> partitionCsmMap = new
HashMap<>();
- public RecordItem(int partitionId, long offsetCfm) {
- this.partitionId = partitionId % TBaseConstants.META_STORE_INS_BASE;
- this.offsetCfm = offsetCfm;
- this.storeId = partitionId / TBaseConstants.META_STORE_INS_BASE;
+ public OffsetCsmRecord(int storeId) {
+ this.storeId = storeId;
+ }
+
+ public void addOffsetCfmInfo(int partitionId, long offsetCfm) {
+ OffsetCsmItem offsetCsmItem = partitionCsmMap.get(partitionId);
+ if (offsetCsmItem == null) {
+ OffsetCsmItem tmpItem = new OffsetCsmItem(partitionId);
+ offsetCsmItem = partitionCsmMap.putIfAbsent(partitionId, tmpItem);
+ if (offsetCsmItem == null) {
+ offsetCsmItem = tmpItem;
+ }
+ }
+ offsetCsmItem.addCfmOffset(offsetCfm);
+ }
+
+ public void addOffsetFetchInfo(int partitionId, long offsetFetch) {
+ OffsetCsmItem offsetCsmItem = partitionCsmMap.get(partitionId);
+ if (offsetCsmItem == null) {
+ OffsetCsmItem tmpItem = new OffsetCsmItem(partitionId);
+ offsetCsmItem = partitionCsmMap.putIfAbsent(partitionId, tmpItem);
+ if (offsetCsmItem == null) {
+ offsetCsmItem = tmpItem;
+ }
+ }
+ offsetCsmItem.addFetchOffset(offsetFetch);
}
public void addStoreInfo(long offsetMin, long offsetMax,
@@ -52,10 +70,6 @@ public class RecordItem {
this.offsetMax = offsetMax;
this.dataMin = dataMin;
this.dataMax = dataMax;
- if (offsetMax != TBaseConstants.META_VALUE_UNDEFINED
- && offsetCfm != TBaseConstants.META_VALUE_UNDEFINED) {
- offsetLag = offsetMax - offsetCfm;
- }
}
public int getStoreId() {
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
new file mode 100644
index 000000000..deb1766c4
--- /dev/null
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetHistoryInfo.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tubemq.server.broker.offset;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.inlong.tubemq.corebase.TBaseConstants;
+import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
+import org.apache.inlong.tubemq.server.common.TServerConstants;
+
+/**
+ * The offset snapshot of the consumer group on the broker.
+ */
+public class OffsetHistoryInfo {
+ private final int brokerId;
+ private final String groupName;
+ private final Map<String, Map<Integer, OffsetCsmRecord>> histOffsetMap =
new HashMap<>();
+
+ public OffsetHistoryInfo(int brokerId, String groupName) {
+ this.brokerId = brokerId;
+ this.groupName = groupName;
+ }
+
+ /**
+ * Add confirmed offset of topic-partitionId.
+ *
+ * @param topicName topic name
+ * @param partitionId partition id
+ * @param cfmOffset the confirmed offset
+ */
+ public void addCfmOffsetInfo(String topicName, int partitionId, long
cfmOffset) {
+ final int storeId = partitionId < TBaseConstants.META_STORE_INS_BASE
+ ? 0 : partitionId / TBaseConstants.META_STORE_INS_BASE;
+ Map<Integer, OffsetCsmRecord> storeOffsetMap =
histOffsetMap.get(topicName);
+ if (storeOffsetMap == null) {
+ Map<Integer, OffsetCsmRecord> tmpMap = new HashMap<>();
+ storeOffsetMap = histOffsetMap.putIfAbsent(topicName, tmpMap);
+ if (storeOffsetMap == null) {
+ storeOffsetMap = tmpMap;
+ }
+ }
+ OffsetCsmRecord offsetCsmRecord = storeOffsetMap.get(storeId);
+ if (offsetCsmRecord == null) {
+ OffsetCsmRecord tmpRecord = new OffsetCsmRecord(storeId);
+ offsetCsmRecord = storeOffsetMap.putIfAbsent(storeId, tmpRecord);
+ if (offsetCsmRecord == null) {
+ offsetCsmRecord = tmpRecord;
+ }
+ }
+ offsetCsmRecord.addOffsetCfmInfo(partitionId, cfmOffset);
+ }
+
+ /**
+ * Add inflight offset of topic-partitionId.
+ *
+ * @param topicName topic name
+ * @param partitionId partition id
+ * @param tmpOffset the inflight offset
+ */
+ public void addInflightOffsetInfo(String topicName, int partitionId, long
tmpOffset) {
+ final int storeId = partitionId < TBaseConstants.META_STORE_INS_BASE
+ ? 0 : partitionId / TBaseConstants.META_STORE_INS_BASE;
+ Map<Integer, OffsetCsmRecord> storeOffsetMap =
histOffsetMap.get(topicName);
+ if (storeOffsetMap == null) {
+ Map<Integer, OffsetCsmRecord> tmpMap = new HashMap<>();
+ storeOffsetMap = histOffsetMap.putIfAbsent(topicName, tmpMap);
+ if (storeOffsetMap == null) {
+ storeOffsetMap = tmpMap;
+ }
+ }
+ OffsetCsmRecord offsetCsmRecord = storeOffsetMap.get(storeId);
+ if (offsetCsmRecord == null) {
+ OffsetCsmRecord tmpRecord = new OffsetCsmRecord(storeId);
+ offsetCsmRecord = storeOffsetMap.putIfAbsent(storeId, tmpRecord);
+ if (offsetCsmRecord == null) {
+ offsetCsmRecord = tmpRecord;
+ }
+ }
+ offsetCsmRecord.addOffsetFetchInfo(partitionId, tmpOffset);
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public Map<String, Map<Integer, OffsetCsmRecord>> getOffsetMap() {
+ return histOffsetMap;
+ }
+
+ /**
+ * Build brief consumption offset information in string format
+ *
+ * @param strBuff string buffer
+ * @param dataTime record build time
+ */
+ public void buildRecordInfo(StringBuilder strBuff, long dataTime) {
+ int topicCnt = 0;
+ strBuff.append("{\"dt\":\"")
+ .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(dataTime))
+ .append("\",\"bId\":").append(brokerId)
+
.append(",\"ver\":").append(TServerConstants.OFFSET_HISTORY_RECORD_SHORT_VERSION)
+ .append(",\"records\":[");
+ for (Map.Entry<String, Map<Integer, OffsetCsmRecord>> entry :
histOffsetMap.entrySet()) {
+ if (entry == null || entry.getKey() == null || entry.getValue() ==
null) {
+ continue;
+ }
+ if (topicCnt++ > 0) {
+ strBuff.append(",");
+ }
+ int recordCnt = 0;
+
strBuff.append("{\"topic\":\"").append(entry.getKey()).append("\",\"offsets\":[");
+ Map<Integer, OffsetCsmRecord> csmOffsetRecordMap =
entry.getValue();
+ for (OffsetCsmRecord offsetRecord : csmOffsetRecordMap.values()) {
+ if (offsetRecord == null) {
+ continue;
+ }
+ if (recordCnt++ > 0) {
+ strBuff.append(",");
+ }
+ strBuff.append("{\"storeId\":").append(offsetRecord.storeId)
+ .append(",\"iMin\":").append(offsetRecord.offsetMin)
+ .append(",\"iMax\":").append(offsetRecord.offsetMax)
+ .append(",\"dMin\":").append(offsetRecord.dataMin)
+ .append(",\"dMax\":").append(offsetRecord.dataMax)
+ .append(",\"parts\":[");
+ int partCnt = 0;
+ for (OffsetCsmItem csmOffsetItem :
offsetRecord.partitionCsmMap.values()) {
+ if (csmOffsetItem == null) {
+ continue;
+ }
+ if (partCnt++ > 0) {
+ strBuff.append(",");
+ }
+
strBuff.append("{\"partId\":").append(csmOffsetItem.partitionId)
+
.append(",\"iCfm\":").append(csmOffsetItem.offsetCfm)
+ .append("}");
+ }
+ strBuff.append("]}");
+ }
+ strBuff.append("]}");
+ }
+ strBuff.append("]}");
+ }
+}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordInfo.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordInfo.java
deleted file mode 100644
index ab2bfcdd3..000000000
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordInfo.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.tubemq.server.broker.offset;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.inlong.tubemq.corebase.TokenConstants;
-import org.apache.inlong.tubemq.corebase.utils.DateTimeConvertUtils;
-import org.apache.inlong.tubemq.server.common.TServerConstants;
-
-/**
- * The offset snapshot of the consumer group on the broker.
- */
-public class OffsetRecordInfo {
- private final int brokerId;
- private final String groupName;
- private final Map<String, Map<Integer, RecordItem>> histOffsetMap = new
HashMap<>();
-
- public OffsetRecordInfo(int brokerId, String groupName) {
- this.brokerId = brokerId;
- this.groupName = groupName;
- }
-
- /**
- * Add confirmed offset of topic-partitionId.
- *
- * @param topicName topic name
- * @param partitionId partition id
- * @param cfmOffset the confirmed offset
- */
- public void addCfmOffsetInfo(String topicName, int partitionId, long
cfmOffset) {
- Map<Integer, RecordItem> partOffsetMap = histOffsetMap.get(topicName);
- if (partOffsetMap == null) {
- Map<Integer, RecordItem> tmpMap = new HashMap<>();
- partOffsetMap = histOffsetMap.putIfAbsent(topicName, tmpMap);
- if (partOffsetMap == null) {
- partOffsetMap = tmpMap;
- }
- }
- partOffsetMap.put(partitionId, new RecordItem(partitionId, cfmOffset));
- }
-
- public Map<String, Map<Integer, RecordItem>> getOffsetMap() {
- return histOffsetMap;
- }
-
- /**
- * Build consumption offset information in string format
- *
- * @param strBuff string buffer
- * @param dataTime record build time
- */
- public void buildRecordInfo(StringBuilder strBuff, long dataTime) {
- int topicCnt = 0;
-
strBuff.append("{\"groupName\":\"").append(groupName).append("\",\"recordTime\":\"")
- .append(DateTimeConvertUtils.ms2yyyyMMddHHmmss(dataTime))
- .append("\",\"brokerId\":").append(brokerId)
-
.append(",\"version\":").append(TServerConstants.OFFSET_HISTORY_RECORD_VERSION)
- .append(",\"records\":[");
- for (Map.Entry<String, Map<Integer, RecordItem>> entry :
histOffsetMap.entrySet()) {
- if (entry == null || entry.getKey() == null || entry.getValue() ==
null) {
- continue;
- }
- if (topicCnt++ > 0) {
- strBuff.append(",");
- }
- int recordCnt = 0;
- strBuff.append("{\"topicName\":\"").append(entry.getKey())
- .append("\",\"offsetInfo\":[");
- Map<Integer, RecordItem> recordItemMap = entry.getValue();
- for (RecordItem recordItem : recordItemMap.values()) {
- if (recordCnt++ > 0) {
- strBuff.append(",");
- }
- strBuff.append("{\"partitionKey\":\"").append(brokerId)
- .append(TokenConstants.ATTR_SEP).append(entry.getKey())
-
.append(TokenConstants.ATTR_SEP).append(recordItem.partitionId)
-
.append("\",\"offsetCfm\":").append(recordItem.offsetCfm)
- .append(",\"offsetMin\":").append(recordItem.offsetMin)
- .append(",\"offsetMax\":").append(recordItem.offsetMax)
- .append(",\"offsetLag\":").append(recordItem.offsetLag)
- .append(",\"dataMin\":").append(recordItem.dataMin)
-
.append(",\"dataMax\":").append(recordItem.dataMax).append("}");
- }
- strBuff.append("]}");
- }
- strBuff.append("]}");
- }
-}
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
index 16901911f..cbc26c1dc 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetRecordService.java
@@ -83,7 +83,7 @@ public class OffsetRecordService extends
AbstractDaemonService {
return;
}
// get group offset information
- Map<String, OffsetRecordInfo> groupOffsetMap =
+ Map<String, OffsetHistoryInfo> groupOffsetMap =
offsetManager.getOnlineGroupOffsetInfo();
if (groupOffsetMap == null || groupOffsetMap.isEmpty()) {
return;
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetService.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetService.java
index b6e342276..1b22163d4 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetService.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/offset/OffsetService.java
@@ -68,7 +68,7 @@ public interface OffsetService {
Map<String, Map<Integer, Tuple2<Long, Long>>> queryGroupOffset(
String group, Map<String, Set<Integer>> topicPartMap);
- Map<String, OffsetRecordInfo> getOnlineGroupOffsetInfo();
+ Map<String, OffsetHistoryInfo> getOnlineGroupOffsetInfo();
boolean modifyGroupOffset(Set<String> groups,
List<Tuple3<String, Integer, Long>>
topicPartOffsets,
diff --git
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
index a7117983a..f67ccb86a 100644
---
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
+++
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/TServerConstants.java
@@ -109,4 +109,5 @@ public final class TServerConstants {
public static final int META_MAX_STATSTYPE_LENGTH = 256;
public static final int OFFSET_HISTORY_RECORD_VERSION = 1;
+ public static final int OFFSET_HISTORY_RECORD_SHORT_VERSION = 2;
}