This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1fe5d62334 [ISSUE #7074] Allow a BoundaryType to be specified when 
retrieving offset based on the timestamp (#7082)
1fe5d62334 is described below

commit 1fe5d6233455f191d7195fb5f6e27dc46510dd3e
Author: koado <[email protected]>
AuthorDate: Thu Aug 3 11:40:16 2023 +0800

    [ISSUE #7074] Allow a BoundaryType to be specified when retrieving offset 
based on the timestamp (#7082)
    
    * add new interface for searching offset with boundary type
    
    * format code
    
    * fix failed test
    
    * unify two BoundaryType class
    
    * add interface getOffsetInQueueByTime(long timestamp, BoundaryType 
boundaryTYpe) in ConsumeQueueInterface
    
    * fix AdminBrokerProcessorTest unnecessary Mockito stubbings
---
 .../broker/processor/AdminBrokerProcessor.java     |  4 +-
 .../broker/processor/AdminBrokerProcessorTest.java |  3 +-
 .../apache/rocketmq/client/impl/MQAdminImpl.java   |  9 +++-
 .../rocketmq/client/impl/MQClientAPIImpl.java      | 10 ++++-
 .../remoting/protocol/RemotingCommand.java         |  4 ++
 .../protocol/header/SearchOffsetRequestHeader.java | 13 ++++++
 .../org/apache/rocketmq/store/ConsumeQueue.java    |  2 +
 .../apache/rocketmq/store/DefaultMessageStore.java |  7 ++-
 .../org/apache/rocketmq/store/MessageStore.java    | 12 +++++
 .../rocketmq/store/queue/BatchConsumeQueue.java    | 37 ++++++++++++----
 .../store/queue/ConsumeQueueInterface.java         | 10 +++++
 .../rocketmq/store/queue/SparseConsumeQueue.java   |  3 +-
 .../rocketmq/tieredstore/MessageStoreFetcher.java  |  2 +-
 .../rocketmq/tieredstore/TieredMessageFetcher.java |  2 +-
 .../rocketmq/tieredstore/TieredMessageStore.java   |  3 +-
 .../rocketmq/tieredstore/common/BoundaryType.java  | 51 ----------------------
 .../rocketmq/tieredstore/file/CompositeAccess.java |  2 +-
 .../tieredstore/file/CompositeFlatFile.java        |  2 +-
 .../tieredstore/file/TieredConsumeQueue.java       |  2 +-
 .../rocketmq/tieredstore/file/TieredFlatFile.java  |  2 +-
 .../tieredstore/TieredMessageFetcherTest.java      |  2 +-
 .../tieredstore/TieredMessageStoreTest.java        |  2 +-
 .../file/CompositeQueueFlatFileTest.java           |  2 +-
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |  9 ++++
 .../tools/admin/DefaultMQAdminExtImpl.java         |  5 +++
 .../tools/admin/DefaultMQAdminExtTest.java         | 30 +++++++++++++
 26 files changed, 154 insertions(+), 76 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 569a1c57bd..a6ce03dc29 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -994,7 +994,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
                     continue;
                 }
                 if (mappingDetail.getBname().equals(item.getBname())) {
-                    offset = 
this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(),
 item.getQueueId(), timestamp);
+                    offset = 
this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(),
 item.getQueueId(), timestamp, requestHeader.getBoundaryType());
                     if (offset > 0) {
                         offset = item.computeStaticQueueOffsetStrictly(offset);
                         break;
@@ -1045,7 +1045,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         }
 
         long offset = 
this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(),
 requestHeader.getQueueId(),
-            requestHeader.getTimestamp());
+            requestHeader.getTimestamp(), requestHeader.getBoundaryType());
 
         responseHeader.setOffset(offset);
 
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index fa2d929b0c..a470c0cf22 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ConsumerManager;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -311,7 +312,7 @@ public class AdminBrokerProcessorTest {
     @Test
     public void testSearchOffsetByTimestamp() throws Exception {
         messageStore = mock(MessageStore.class);
-        when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), 
anyLong())).thenReturn(Long.MIN_VALUE);
+        when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), 
anyLong(), any(BoundaryType.class))).thenReturn(Long.MIN_VALUE);
         when(brokerController.getMessageStore()).thenReturn(messageStore);
         SearchOffsetRequestHeader searchOffsetRequestHeader = new 
SearchOffsetRequestHeader();
         searchOffsetRequestHeader.setTopic("topic");
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 33fc44fd63..1ef3a94835 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.help.FAQUrl;
@@ -184,6 +185,11 @@ public class MQAdminImpl {
     }
 
     public long searchOffset(MessageQueue mq, long timestamp) throws 
MQClientException {
+        // default return lower boundary offset when there are more than one 
offsets.
+        return searchOffset(mq, timestamp, BoundaryType.LOWER);
+    }
+
+    public long searchOffset(MessageQueue mq, long timestamp, BoundaryType 
boundaryType) throws MQClientException {
         String brokerAddr = 
this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
         if (null == brokerAddr) {
             
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
@@ -192,7 +198,8 @@ public class MQAdminImpl {
 
         if (brokerAddr != null) {
             try {
-                return 
this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, 
timestamp, timeoutMillis);
+                return 
this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq, 
timestamp,
+                        boundaryType, timeoutMillis);
             } catch (Exception e) {
                 throw new MQClientException("Invoke Broker[" + brokerAddr + "] 
exception", e);
             }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 4c9c3a1699..708a6acd1d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -76,6 +76,7 @@ import 
org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -1237,13 +1238,20 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
     public long searchOffset(final String addr, final MessageQueue 
messageQueue, final long timestamp,
         final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
+        // default return lower boundary offset when there are more than one 
offsets.
+        return searchOffset(addr, messageQueue, timestamp, BoundaryType.LOWER, 
timeoutMillis);
+    }
+
+    public long searchOffset(final String addr, final MessageQueue 
messageQueue, final long timestamp,
+        final BoundaryType boundaryType, final long timeoutMillis)
+        throws RemotingException, MQBrokerException, InterruptedException {
         SearchOffsetRequestHeader requestHeader = new 
SearchOffsetRequestHeader();
         requestHeader.setTopic(messageQueue.getTopic());
         requestHeader.setQueueId(messageQueue.getQueueId());
         requestHeader.setBname(messageQueue.getBrokerName());
         requestHeader.setTimestamp(timestamp);
+        requestHeader.setBoundaryType(boundaryType);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, 
requestHeader);
-
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
             request, timeoutMillis);
         assert response != null;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index a6ed022eae..d27135132c 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -63,6 +64,7 @@ public class RemotingCommand {
     private static final String LONG_CANONICAL_NAME_2 = 
long.class.getCanonicalName();
     private static final String BOOLEAN_CANONICAL_NAME_1 = 
Boolean.class.getCanonicalName();
     private static final String BOOLEAN_CANONICAL_NAME_2 = 
boolean.class.getCanonicalName();
+    private static final String BOUNDARY_TYPE_CANONICAL_NAME = 
BoundaryType.class.getCanonicalName();
     private static volatile int configVersion = -1;
     private static AtomicInteger requestId = new AtomicInteger(0);
 
@@ -311,6 +313,8 @@ public class RemotingCommand {
                                 valueParsed = Boolean.parseBoolean(value);
                             } else if (type.equals(DOUBLE_CANONICAL_NAME_1) || 
type.equals(DOUBLE_CANONICAL_NAME_2)) {
                                 valueParsed = Double.parseDouble(value);
+                            } else if 
(type.equals(BOUNDARY_TYPE_CANONICAL_NAME)) {
+                                valueParsed = BoundaryType.getType(value);
                             } else {
                                 throw new RemotingCommandException("the custom 
field <" + fieldName + "> type is not supported");
                             }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
index 0c644d7393..79c3d337be 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
@@ -21,6 +21,7 @@
 package org.apache.rocketmq.remoting.protocol.header;
 
 import com.google.common.base.MoreObjects;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
@@ -33,6 +34,8 @@ public class SearchOffsetRequestHeader extends 
TopicQueueRequestHeader {
     @CFNotNull
     private Long timestamp;
 
+    private BoundaryType boundaryType;
+
     @Override
     public void checkFields() throws RemotingCommandException {
 
@@ -66,12 +69,22 @@ public class SearchOffsetRequestHeader extends 
TopicQueueRequestHeader {
         this.timestamp = timestamp;
     }
 
+    public BoundaryType getBoundaryType() {
+        // default return LOWER
+        return boundaryType == null ? BoundaryType.LOWER : boundaryType;
+    }
+
+    public void setBoundaryType(BoundaryType boundaryType) {
+        this.boundaryType = boundaryType;
+    }
+
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
             .add("topic", topic)
             .add("queueId", queueId)
             .add("timestamp", timestamp)
+            .add("boundaryType", boundaryType.getName())
             .toString();
     }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 0c44ad043f..a0b886eb0e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -204,6 +204,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
         return CQ_STORE_UNIT_SIZE;
     }
 
+    @Deprecated
     @Override
     public long getOffsetInQueueByTime(final long timestamp) {
         MappedFile mappedFile = 
this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp,
@@ -211,6 +212,7 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
         return binarySearchInQueueByTime(mappedFile, timestamp, 
BoundaryType.LOWER);
     }
 
+    @Override
     public long getOffsetInQueueByTime(final long timestamp, final 
BoundaryType boundaryType) {
         MappedFile mappedFile = 
this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp,
             messageStore.getCommitLog(), boundaryType);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index acc1610e08..25e4a166f3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -82,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
 import org.apache.rocketmq.common.utils.QueueTypeUtils;
 import org.apache.rocketmq.common.utils.ServiceProvider;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -1015,9 +1016,13 @@ public class DefaultMessageStore implements MessageStore 
{
 
     @Override
     public long getOffsetInQueueByTime(String topic, int queueId, long 
timestamp) {
+        return getOffsetInQueueByTime(topic, queueId, timestamp, 
BoundaryType.LOWER);
+    }
+
+    public long getOffsetInQueueByTime(String topic, int queueId, long 
timestamp, BoundaryType boundaryType) {
         ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId);
         if (logic != null) {
-            long resultOffset = logic.getOffsetInQueueByTime(timestamp);
+            long resultOffset = logic.getOffsetInQueueByTime(timestamp, 
boundaryType);
             // Make sure the result offset is in valid range.
             resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue());
             resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue());
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 3db0c18f7f..31bbb907f4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.SystemClock;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -226,6 +227,17 @@ public interface MessageStore {
      */
     long getOffsetInQueueByTime(final String topic, final int queueId, final 
long timestamp);
 
+    /**
+     * Look up the physical offset of the message whose store timestamp is as 
specified with specific boundaryType.
+     *
+     * @param topic        Topic of the message.
+     * @param queueId      Queue ID.
+     * @param timestamp    Timestamp to look up.
+     * @param boundaryType Lower or Upper
+     * @return physical offset which matches.
+     */
+    long getOffsetInQueueByTime(final String topic, final int queueId, final 
long timestamp, final BoundaryType boundaryType);
+
     /**
      * Look up the message by given commit log offset.
      *
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 8fec1bf7b0..387c233bf5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.Function;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageAccessor;
@@ -708,8 +709,14 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
      * @param timestamp
      * @return
      */
+    @Deprecated
     @Override
     public long getOffsetInQueueByTime(final long timestamp) {
+        return getOffsetInQueueByTime(timestamp, BoundaryType.LOWER);
+    }
+
+    @Override
+    public long getOffsetInQueueByTime(long timestamp, BoundaryType 
boundaryType) {
         MappedFile targetBcq;
         BatchOffsetIndex targetMinOffset;
 
@@ -760,7 +767,7 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
             if (timestamp >= maxQueueTimestamp) {
                 return byteBuffer.getLong(right + MSG_BASE_OFFSET_INDEX);
             }
-            int mid = binarySearchRight(byteBuffer, left, right, 
CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp);
+            int mid = binarySearchRight(byteBuffer, left, right, 
CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp, boundaryType);
             if (mid != -1) {
                 return byteBuffer.getLong(mid + MSG_BASE_OFFSET_INDEX);
             }
@@ -819,11 +826,11 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
 
     /**
      * Find the offset of which the value is equal or larger than the given 
targetValue.
-     * If there are many values equal to the target, then find the earliest 
one.
+     * If there are many values equal to the target, then return the lowest 
offset if boundaryType is LOWER while
+     * return the highest offset if boundaryType is UPPER.
      */
     public static int binarySearchRight(ByteBuffer byteBuffer, int left, int 
right, final int unitSize,
-        final int unitShift,
-        long targetValue) {
+        final int unitShift, long targetValue, BoundaryType boundaryType) {
         int mid = -1;
         while (left <= right) {
             mid = ceil((left + right) / 2);
@@ -844,10 +851,24 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
                 }
             } else {
                 //mid is actually in the mid
-                if (tmpValue < targetValue) {
-                    left = mid + unitSize;
-                } else {
-                    right = mid;
+                switch (boundaryType) {
+                    case LOWER:
+                        if (tmpValue < targetValue) {
+                            left = mid + unitSize;
+                        } else {
+                            right = mid;
+                        }
+                        break;
+                    case UPPER:
+                        if (tmpValue <= targetValue) {
+                            left = mid;
+                        } else {
+                            right = mid - unitSize;
+                        }
+                        break;
+                    default:
+                        log.warn("Unknown boundary type");
+                        return -1;
                 }
             }
         }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index d7213fa37a..55d0808292 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.store.queue;
 
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.attribute.CQType;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.DispatchRequest;
@@ -93,6 +94,15 @@ public interface ConsumeQueueInterface extends 
FileQueueLifeCycle {
      */
     long getOffsetInQueueByTime(final long timestamp);
 
+    /**
+     * Get the message whose timestamp is the smallest, greater than or equal 
to the given time and when there are more
+     * than one message satisfy the condition, decide which one to return 
based on boundaryType.
+     * @param timestamp    timestamp
+     * @param boundaryType Lower or Upper
+     * @return the offset(index)
+     */
+    long getOffsetInQueueByTime(final long timestamp, final BoundaryType 
boundaryType);
+
     /**
      * The max physical offset of commitlog has been dispatched to this queue.
      * It should be exclusive.
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
index 5b397d696b..4a5f3a93b1 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.store.queue;
 
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.store.MessageStore;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -148,7 +149,7 @@ public class SparseConsumeQueue extends BatchConsumeQueue {
             ByteBuffer byteBuffer = sbr.getByteBuffer();
             int left = minOffset.getIndexPos();
             int right = maxOffset.getIndexPos();
-            int mid = binarySearchRight(byteBuffer, left, right, 
CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset);
+            int mid = binarySearchRight(byteBuffer, left, right, 
CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset, BoundaryType.LOWER);
             if (mid != -1) {
                 return minOffset.getMappedFile().selectMappedBuffer(mid);
             }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
index f4d576d29d..8ae4dc7f9e 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
@@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.QueryMessageResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
+import org.apache.rocketmq.common.BoundaryType;
 
 public interface MessageStoreFetcher {
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index c4fed54bd7..9a9a3e5a5c 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -39,7 +39,6 @@ import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.MessageFilter;
 import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
 import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
 import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
 import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
@@ -59,6 +58,7 @@ import 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
 import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
 
 public class TieredMessageFetcher implements MessageStoreFetcher {
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 115d9640d6..1f12410f2e 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.PopAckConstants;
@@ -45,7 +46,6 @@ import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
 import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
 import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
@@ -287,6 +287,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
         return getOffsetInQueueByTime(topic, queueId, timestamp, 
BoundaryType.LOWER);
     }
 
+    @Override
     public long getOffsetInQueueByTime(String topic, int queueId, long 
timestamp, BoundaryType boundaryType) {
         long earliestTimeInNextStore = next.getEarliestMessageTime();
         if (earliestTimeInNextStore <= 0) {
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
deleted file mode 100644
index 77e53ec113..0000000000
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.tieredstore.common;
-
-/**
- *  This enumeration represents the boundary types.
- *  It has two constants, lower and upper, which represent the lower and upper 
boundaries respectively.
- */
-public enum BoundaryType {
-
-    /**
-     * Represents the lower boundary.
-     */
-    LOWER("lower"),
-
-    /**
-     * Represents the upper boundary.
-     */
-    UPPER("upper");
-
-    private final String name;
-
-    BoundaryType(String name) {
-        this.name = name;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public static BoundaryType getType(String name) {
-        if (BoundaryType.UPPER.getName().equalsIgnoreCase(name)) {
-            return UPPER;
-        }
-        return LOWER;
-    }
-}
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
index bc1062cd01..3d962e40d6 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
+import org.apache.rocketmq.common.BoundaryType;
 
 interface CompositeAccess {
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
index fa01382e17..df4baf33f4 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
@@ -37,7 +37,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
 import org.apache.rocketmq.tieredstore.common.InFlightRequestKey;
@@ -46,6 +45,7 @@ import 
org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
 import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
 
 public class CompositeFlatFile implements CompositeAccess {
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
index ff9572af6f..35007f8cbf 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
@@ -21,8 +21,8 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
 import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.common.BoundaryType;
 
 public class TieredConsumeQueue {
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index 90ca843bf6..75ce8d89f2 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
 import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
@@ -42,6 +41,7 @@ import 
org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
 import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator;
 import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
 
 public class TieredFlatFile {
 
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index df3720babd..d75b2f9164 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -30,7 +30,6 @@ import org.apache.rocketmq.store.GetMessageStatus;
 import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
 import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
@@ -41,6 +40,7 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Assert;
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 58b7a52cc9..8601392e74 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -37,11 +37,11 @@ import org.apache.rocketmq.store.QueryMessageResult;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
 import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
 import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
 import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 8322c72edf..27efe111e6 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -23,7 +23,6 @@ import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
@@ -33,6 +32,7 @@ import 
org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index dd9c6a9b45..f0a08dfb1a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -122,6 +123,14 @@ public class DefaultMQAdminExt extends ClientConfig 
implements MQAdminExt {
         return defaultMQAdminExtImpl.searchOffset(mq, timestamp);
     }
 
+    public long searchLowerBoundaryOffset(MessageQueue mq, long timestamp) 
throws MQClientException {
+        return defaultMQAdminExtImpl.searchOffset(mq, timestamp, 
BoundaryType.LOWER);
+    }
+
+    public long searchUpperBoundaryOffset(MessageQueue mq, long timestamp) 
throws MQClientException {
+        return defaultMQAdminExtImpl.searchOffset(mq, timestamp, 
BoundaryType.UPPER);
+    }
+
     @Override
     public long maxOffset(MessageQueue mq) throws MQClientException {
         return defaultMQAdminExtImpl.maxOffset(mq);
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index c5c467bf00..fa3596d51c 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -65,6 +65,7 @@ import org.apache.rocketmq.common.message.MessageRequestMode;
 import org.apache.rocketmq.common.namesrv.NamesrvUtil;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -1700,6 +1701,10 @@ public class DefaultMQAdminExtImpl implements 
MQAdminExt, MQAdminExtInner {
         return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, 
timestamp);
     }
 
+    public long searchOffset(MessageQueue mq, long timestamp, BoundaryType 
boundaryType) throws MQClientException {
+        return this.mqClientInstance.getMQAdminImpl().searchOffset(mq, 
timestamp, boundaryType);
+    }
+
     @Override
     public long maxOffset(MessageQueue mq) throws MQClientException {
         return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index b94754f22d..dc5642f88c 100644
--- 
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -512,6 +512,36 @@ public class DefaultMQAdminExtTest {
         assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(TOPIC1, 
BROKER1_NAME, 0), System.currentTimeMillis())).isEqualTo(101L);
     }
 
+    @Test
+    public void testSearchOffsetWithSpecificBoundaryType() throws Exception {
+        // do mock
+        DefaultMQAdminExt mockDefaultMQAdminExt = 
mock(DefaultMQAdminExt.class);
+        
when(mockDefaultMQAdminExt.minOffset(any(MessageQueue.class))).thenReturn(0L);
+        
when(mockDefaultMQAdminExt.maxOffset(any(MessageQueue.class))).thenReturn(101L);
+        
when(mockDefaultMQAdminExt.searchLowerBoundaryOffset(any(MessageQueue.class), 
anyLong())).thenReturn(0L);
+        
when(mockDefaultMQAdminExt.searchUpperBoundaryOffset(any(MessageQueue.class), 
anyLong())).thenReturn(100L);
+        when(mockDefaultMQAdminExt.queryConsumeTimeSpan(anyString(), 
anyString())).thenReturn(mockQueryConsumeTimeSpan());
+
+        for (QueueTimeSpan timeSpan: 
mockDefaultMQAdminExt.queryConsumeTimeSpan(TOPIC1, "group_one")) {
+            MessageQueue mq = timeSpan.getMessageQueue();
+            long maxOffset = mockDefaultMQAdminExt.maxOffset(mq);
+            long minOffset = mockDefaultMQAdminExt.minOffset(mq);
+            // if there is at least one message in queue, the maxOffset 
returns the queue's latest offset + 1
+            assertThat((maxOffset == 0 ? 0 : maxOffset - 1) == 
mockDefaultMQAdminExt.searchUpperBoundaryOffset(mq, 
timeSpan.getMaxTimeStamp())).isTrue();
+            assertThat(minOffset == 
mockDefaultMQAdminExt.searchLowerBoundaryOffset(mq, 
timeSpan.getMinTimeStamp())).isTrue();
+        }
+    }
+
+    private List<QueueTimeSpan> mockQueryConsumeTimeSpan() {
+        List<QueueTimeSpan> spanSet = new ArrayList<>();
+        QueueTimeSpan timeSpan = new QueueTimeSpan();
+        timeSpan.setMessageQueue(new MessageQueue(TOPIC1, BROKER1_NAME, 0));
+        timeSpan.setMinTimeStamp(1690421253000L);
+        timeSpan.setMaxTimeStamp(1690507653000L);
+        spanSet.add(timeSpan);
+        return spanSet;
+    }
+
     @Test
     public void testExamineTopicConfig() throws MQBrokerException, 
RemotingException, InterruptedException {
         TopicConfig topicConfig = 
defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911", 
"topic_test_examine_topicConfig");


Reply via email to