This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1fe5d62334 [ISSUE #7074] Allow a BoundaryType to be specified when
retrieving offset based on the timestamp (#7082)
1fe5d62334 is described below
commit 1fe5d6233455f191d7195fb5f6e27dc46510dd3e
Author: koado <[email protected]>
AuthorDate: Thu Aug 3 11:40:16 2023 +0800
[ISSUE #7074] Allow a BoundaryType to be specified when retrieving offset
based on the timestamp (#7082)
* add new interface for searching offset with boundary type
* format code
* fix failed test
* unify two BoundaryType class
* add interface getOffsetInQueueByTime(long timestamp, BoundaryType
boundaryTYpe) in ConsumeQueueInterface
* fix AdminBrokerProcessorTest unnecessary Mockito stubbings
---
.../broker/processor/AdminBrokerProcessor.java | 4 +-
.../broker/processor/AdminBrokerProcessorTest.java | 3 +-
.../apache/rocketmq/client/impl/MQAdminImpl.java | 9 +++-
.../rocketmq/client/impl/MQClientAPIImpl.java | 10 ++++-
.../remoting/protocol/RemotingCommand.java | 4 ++
.../protocol/header/SearchOffsetRequestHeader.java | 13 ++++++
.../org/apache/rocketmq/store/ConsumeQueue.java | 2 +
.../apache/rocketmq/store/DefaultMessageStore.java | 7 ++-
.../org/apache/rocketmq/store/MessageStore.java | 12 +++++
.../rocketmq/store/queue/BatchConsumeQueue.java | 37 ++++++++++++----
.../store/queue/ConsumeQueueInterface.java | 10 +++++
.../rocketmq/store/queue/SparseConsumeQueue.java | 3 +-
.../rocketmq/tieredstore/MessageStoreFetcher.java | 2 +-
.../rocketmq/tieredstore/TieredMessageFetcher.java | 2 +-
.../rocketmq/tieredstore/TieredMessageStore.java | 3 +-
.../rocketmq/tieredstore/common/BoundaryType.java | 51 ----------------------
.../rocketmq/tieredstore/file/CompositeAccess.java | 2 +-
.../tieredstore/file/CompositeFlatFile.java | 2 +-
.../tieredstore/file/TieredConsumeQueue.java | 2 +-
.../rocketmq/tieredstore/file/TieredFlatFile.java | 2 +-
.../tieredstore/TieredMessageFetcherTest.java | 2 +-
.../tieredstore/TieredMessageStoreTest.java | 2 +-
.../file/CompositeQueueFlatFileTest.java | 2 +-
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 9 ++++
.../tools/admin/DefaultMQAdminExtImpl.java | 5 +++
.../tools/admin/DefaultMQAdminExtTest.java | 30 +++++++++++++
26 files changed, 154 insertions(+), 76 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 569a1c57bd..a6ce03dc29 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -994,7 +994,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
continue;
}
if (mappingDetail.getBname().equals(item.getBname())) {
- offset =
this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(),
item.getQueueId(), timestamp);
+ offset =
this.brokerController.getMessageStore().getOffsetInQueueByTime(mappingContext.getTopic(),
item.getQueueId(), timestamp, requestHeader.getBoundaryType());
if (offset > 0) {
offset = item.computeStaticQueueOffsetStrictly(offset);
break;
@@ -1045,7 +1045,7 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
}
long offset =
this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(),
requestHeader.getQueueId(),
- requestHeader.getTimestamp());
+ requestHeader.getTimestamp(), requestHeader.getBoundaryType());
responseHeader.setOffset(offset);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index fa2d929b0c..a470c0cf22 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.broker.client.ConsumerManager;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.schedule.ScheduleMessageService;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -311,7 +312,7 @@ public class AdminBrokerProcessorTest {
@Test
public void testSearchOffsetByTimestamp() throws Exception {
messageStore = mock(MessageStore.class);
- when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(),
anyLong())).thenReturn(Long.MIN_VALUE);
+ when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(),
anyLong(), any(BoundaryType.class))).thenReturn(Long.MIN_VALUE);
when(brokerController.getMessageStore()).thenReturn(messageStore);
SearchOffsetRequestHeader searchOffsetRequestHeader = new
SearchOffsetRequestHeader();
searchOffsetRequestHeader.setTopic("topic");
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 33fc44fd63..1ef3a94835 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.help.FAQUrl;
@@ -184,6 +185,11 @@ public class MQAdminImpl {
}
public long searchOffset(MessageQueue mq, long timestamp) throws
MQClientException {
+ // default return lower boundary offset when there are more than one
offsets.
+ return searchOffset(mq, timestamp, BoundaryType.LOWER);
+ }
+
+ public long searchOffset(MessageQueue mq, long timestamp, BoundaryType
boundaryType) throws MQClientException {
String brokerAddr =
this.mQClientFactory.findBrokerAddressInPublish(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
@@ -192,7 +198,8 @@ public class MQAdminImpl {
if (brokerAddr != null) {
try {
- return
this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq,
timestamp, timeoutMillis);
+ return
this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr, mq,
timestamp,
+ boundaryType, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "]
exception", e);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 4c9c3a1699..708a6acd1d 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -76,6 +76,7 @@ import
org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -1237,13 +1238,20 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
public long searchOffset(final String addr, final MessageQueue
messageQueue, final long timestamp,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
+ // default return lower boundary offset when there are more than one
offsets.
+ return searchOffset(addr, messageQueue, timestamp, BoundaryType.LOWER,
timeoutMillis);
+ }
+
+ public long searchOffset(final String addr, final MessageQueue
messageQueue, final long timestamp,
+ final BoundaryType boundaryType, final long timeoutMillis)
+ throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new
SearchOffsetRequestHeader();
requestHeader.setTopic(messageQueue.getTopic());
requestHeader.setQueueId(messageQueue.getQueueId());
requestHeader.setBname(messageQueue.getBrokerName());
requestHeader.setTimestamp(timestamp);
+ requestHeader.setBoundaryType(boundaryType);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP,
requestHeader);
-
RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
addr),
request, timeoutMillis);
assert response != null;
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index a6ed022eae..d27135132c 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -63,6 +64,7 @@ public class RemotingCommand {
private static final String LONG_CANONICAL_NAME_2 =
long.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_1 =
Boolean.class.getCanonicalName();
private static final String BOOLEAN_CANONICAL_NAME_2 =
boolean.class.getCanonicalName();
+ private static final String BOUNDARY_TYPE_CANONICAL_NAME =
BoundaryType.class.getCanonicalName();
private static volatile int configVersion = -1;
private static AtomicInteger requestId = new AtomicInteger(0);
@@ -311,6 +313,8 @@ public class RemotingCommand {
valueParsed = Boolean.parseBoolean(value);
} else if (type.equals(DOUBLE_CANONICAL_NAME_1) ||
type.equals(DOUBLE_CANONICAL_NAME_2)) {
valueParsed = Double.parseDouble(value);
+ } else if
(type.equals(BOUNDARY_TYPE_CANONICAL_NAME)) {
+ valueParsed = BoundaryType.getType(value);
} else {
throw new RemotingCommandException("the custom
field <" + fieldName + "> type is not supported");
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
index 0c644d7393..79c3d337be 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/SearchOffsetRequestHeader.java
@@ -21,6 +21,7 @@
package org.apache.rocketmq.remoting.protocol.header;
import com.google.common.base.MoreObjects;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.rpc.TopicQueueRequestHeader;
@@ -33,6 +34,8 @@ public class SearchOffsetRequestHeader extends
TopicQueueRequestHeader {
@CFNotNull
private Long timestamp;
+ private BoundaryType boundaryType;
+
@Override
public void checkFields() throws RemotingCommandException {
@@ -66,12 +69,22 @@ public class SearchOffsetRequestHeader extends
TopicQueueRequestHeader {
this.timestamp = timestamp;
}
+ public BoundaryType getBoundaryType() {
+ // default return LOWER
+ return boundaryType == null ? BoundaryType.LOWER : boundaryType;
+ }
+
+ public void setBoundaryType(BoundaryType boundaryType) {
+ this.boundaryType = boundaryType;
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("topic", topic)
.add("queueId", queueId)
.add("timestamp", timestamp)
+ .add("boundaryType", boundaryType.getName())
.toString();
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 0c44ad043f..a0b886eb0e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -204,6 +204,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
return CQ_STORE_UNIT_SIZE;
}
+ @Deprecated
@Override
public long getOffsetInQueueByTime(final long timestamp) {
MappedFile mappedFile =
this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp,
@@ -211,6 +212,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
return binarySearchInQueueByTime(mappedFile, timestamp,
BoundaryType.LOWER);
}
+ @Override
public long getOffsetInQueueByTime(final long timestamp, final
BoundaryType boundaryType) {
MappedFile mappedFile =
this.mappedFileQueue.getConsumeQueueMappedFileByTime(timestamp,
messageStore.getCommitLog(), boundaryType);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index acc1610e08..25e4a166f3 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -82,6 +82,7 @@ import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.CleanupPolicyUtils;
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.common.utils.ServiceProvider;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
@@ -1015,9 +1016,13 @@ public class DefaultMessageStore implements MessageStore
{
@Override
public long getOffsetInQueueByTime(String topic, int queueId, long
timestamp) {
+ return getOffsetInQueueByTime(topic, queueId, timestamp,
BoundaryType.LOWER);
+ }
+
+ public long getOffsetInQueueByTime(String topic, int queueId, long
timestamp, BoundaryType boundaryType) {
ConsumeQueueInterface logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
- long resultOffset = logic.getOffsetInQueueByTime(timestamp);
+ long resultOffset = logic.getOffsetInQueueByTime(timestamp,
boundaryType);
// Make sure the result offset is in valid range.
resultOffset = Math.max(resultOffset, logic.getMinOffsetInQueue());
resultOffset = Math.min(resultOffset, logic.getMaxOffsetInQueue());
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 3db0c18f7f..31bbb907f4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.SystemClock;
import org.apache.rocketmq.common.message.MessageExt;
@@ -226,6 +227,17 @@ public interface MessageStore {
*/
long getOffsetInQueueByTime(final String topic, final int queueId, final
long timestamp);
+ /**
+ * Look up the physical offset of the message whose store timestamp is as
specified with specific boundaryType.
+ *
+ * @param topic Topic of the message.
+ * @param queueId Queue ID.
+ * @param timestamp Timestamp to look up.
+ * @param boundaryType Lower or Upper
+ * @return physical offset which matches.
+ */
+ long getOffsetInQueueByTime(final String topic, final int queueId, final
long timestamp, final BoundaryType boundaryType);
+
/**
* Look up the message by given commit log offset.
*
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 8fec1bf7b0..387c233bf5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
@@ -708,8 +709,14 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
* @param timestamp
* @return
*/
+ @Deprecated
@Override
public long getOffsetInQueueByTime(final long timestamp) {
+ return getOffsetInQueueByTime(timestamp, BoundaryType.LOWER);
+ }
+
+ @Override
+ public long getOffsetInQueueByTime(long timestamp, BoundaryType
boundaryType) {
MappedFile targetBcq;
BatchOffsetIndex targetMinOffset;
@@ -760,7 +767,7 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
if (timestamp >= maxQueueTimestamp) {
return byteBuffer.getLong(right + MSG_BASE_OFFSET_INDEX);
}
- int mid = binarySearchRight(byteBuffer, left, right,
CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp);
+ int mid = binarySearchRight(byteBuffer, left, right,
CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp, boundaryType);
if (mid != -1) {
return byteBuffer.getLong(mid + MSG_BASE_OFFSET_INDEX);
}
@@ -819,11 +826,11 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
/**
* Find the offset of which the value is equal or larger than the given
targetValue.
- * If there are many values equal to the target, then find the earliest
one.
+ * If there are many values equal to the target, then return the lowest
offset if boundaryType is LOWER while
+ * return the highest offset if boundaryType is UPPER.
*/
public static int binarySearchRight(ByteBuffer byteBuffer, int left, int
right, final int unitSize,
- final int unitShift,
- long targetValue) {
+ final int unitShift, long targetValue, BoundaryType boundaryType) {
int mid = -1;
while (left <= right) {
mid = ceil((left + right) / 2);
@@ -844,10 +851,24 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
}
} else {
//mid is actually in the mid
- if (tmpValue < targetValue) {
- left = mid + unitSize;
- } else {
- right = mid;
+ switch (boundaryType) {
+ case LOWER:
+ if (tmpValue < targetValue) {
+ left = mid + unitSize;
+ } else {
+ right = mid;
+ }
+ break;
+ case UPPER:
+ if (tmpValue <= targetValue) {
+ left = mid;
+ } else {
+ right = mid - unitSize;
+ }
+ break;
+ default:
+ log.warn("Unknown boundary type");
+ return -1;
}
}
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
index d7213fa37a..55d0808292 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueInterface.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.store.queue;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.attribute.CQType;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.DispatchRequest;
@@ -93,6 +94,15 @@ public interface ConsumeQueueInterface extends
FileQueueLifeCycle {
*/
long getOffsetInQueueByTime(final long timestamp);
+ /**
+ * Get the message whose timestamp is the smallest, greater than or equal
to the given time and when there are more
+ * than one message satisfy the condition, decide which one to return
based on boundaryType.
+ * @param timestamp timestamp
+ * @param boundaryType Lower or Upper
+ * @return the offset(index)
+ */
+ long getOffsetInQueueByTime(final long timestamp, final BoundaryType
boundaryType);
+
/**
* The max physical offset of commitlog has been dispatched to this queue.
* It should be exclusive.
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
index 5b397d696b..4a5f3a93b1 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/SparseConsumeQueue.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.store.queue;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.SelectMappedBufferResult;
@@ -148,7 +149,7 @@ public class SparseConsumeQueue extends BatchConsumeQueue {
ByteBuffer byteBuffer = sbr.getByteBuffer();
int left = minOffset.getIndexPos();
int right = maxOffset.getIndexPos();
- int mid = binarySearchRight(byteBuffer, left, right,
CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset);
+ int mid = binarySearchRight(byteBuffer, left, right,
CQ_STORE_UNIT_SIZE, MSG_BASE_OFFSET_INDEX, msgOffset, BoundaryType.LOWER);
if (mid != -1) {
return minOffset.getMappedFile().selectMappedBuffer(mid);
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
index f4d576d29d..8ae4dc7f9e 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreFetcher.java
@@ -21,7 +21,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.QueryMessageResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
+import org.apache.rocketmq.common.BoundaryType;
public interface MessageStoreFetcher {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index c4fed54bd7..9a9a3e5a5c 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -39,7 +39,6 @@ import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
import org.apache.rocketmq.tieredstore.common.MessageCacheKey;
import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
@@ -59,6 +58,7 @@ import
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
public class TieredMessageFetcher implements MessageStoreFetcher {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 115d9640d6..1f12410f2e 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.PopAckConstants;
@@ -45,7 +46,6 @@ import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.file.CompositeFlatFile;
@@ -287,6 +287,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
return getOffsetInQueueByTime(topic, queueId, timestamp,
BoundaryType.LOWER);
}
+ @Override
public long getOffsetInQueueByTime(String topic, int queueId, long
timestamp, BoundaryType boundaryType) {
long earliestTimeInNextStore = next.getEarliestMessageTime();
if (earliestTimeInNextStore <= 0) {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
deleted file mode 100644
index 77e53ec113..0000000000
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/BoundaryType.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.tieredstore.common;
-
-/**
- * This enumeration represents the boundary types.
- * It has two constants, lower and upper, which represent the lower and upper
boundaries respectively.
- */
-public enum BoundaryType {
-
- /**
- * Represents the lower boundary.
- */
- LOWER("lower"),
-
- /**
- * Represents the upper boundary.
- */
- UPPER("upper");
-
- private final String name;
-
- BoundaryType(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- public static BoundaryType getType(String name) {
- if (BoundaryType.UPPER.getName().equalsIgnoreCase(name)) {
- return UPPER;
- }
- return LOWER;
- }
-}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
index bc1062cd01..3d962e40d6 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeAccess.java
@@ -20,7 +20,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
+import org.apache.rocketmq.common.BoundaryType;
interface CompositeAccess {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
index fa01382e17..df4baf33f4 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/CompositeFlatFile.java
@@ -37,7 +37,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.InFlightRequestFuture;
import org.apache.rocketmq.tieredstore.common.InFlightRequestKey;
@@ -46,6 +45,7 @@ import
org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.util.CQItemBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
public class CompositeFlatFile implements CompositeAccess {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
index ff9572af6f..35007f8cbf 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredConsumeQueue.java
@@ -21,8 +21,8 @@ import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.common.BoundaryType;
public class TieredConsumeQueue {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index 90ca843bf6..75ce8d89f2 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
@@ -42,6 +41,7 @@ import
org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
public class TieredFlatFile {
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index df3720babd..d75b2f9164 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -30,7 +30,6 @@ import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
@@ -41,6 +40,7 @@ import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 58b7a52cc9..8601392e74 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -37,11 +37,11 @@ import org.apache.rocketmq.store.QueryMessageResult;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
import org.apache.rocketmq.tieredstore.file.CompositeQueueFlatFile;
import org.apache.rocketmq.tieredstore.file.TieredFlatFileManager;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
index 8322c72edf..27efe111e6 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/CompositeQueueFlatFileTest.java
@@ -23,7 +23,6 @@ import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.AppendResult;
-import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
@@ -33,6 +32,7 @@ import
org.apache.rocketmq.tieredstore.provider.memory.MemoryFileSegment;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+import org.apache.rocketmq.common.BoundaryType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index dd9c6a9b45..f0a08dfb1a 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
@@ -122,6 +123,14 @@ public class DefaultMQAdminExt extends ClientConfig
implements MQAdminExt {
return defaultMQAdminExtImpl.searchOffset(mq, timestamp);
}
+ public long searchLowerBoundaryOffset(MessageQueue mq, long timestamp)
throws MQClientException {
+ return defaultMQAdminExtImpl.searchOffset(mq, timestamp,
BoundaryType.LOWER);
+ }
+
+ public long searchUpperBoundaryOffset(MessageQueue mq, long timestamp)
throws MQClientException {
+ return defaultMQAdminExtImpl.searchOffset(mq, timestamp,
BoundaryType.UPPER);
+ }
+
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return defaultMQAdminExtImpl.maxOffset(mq);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index c5c467bf00..fa3596d51c 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -65,6 +65,7 @@ import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.namesrv.NamesrvUtil;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.RPCHook;
@@ -1700,6 +1701,10 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq,
timestamp);
}
+ public long searchOffset(MessageQueue mq, long timestamp, BoundaryType
boundaryType) throws MQClientException {
+ return this.mqClientInstance.getMQAdminImpl().searchOffset(mq,
timestamp, boundaryType);
+ }
+
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
diff --git
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
index b94754f22d..dc5642f88c 100644
---
a/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
+++
b/tools/src/test/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtTest.java
@@ -512,6 +512,36 @@ public class DefaultMQAdminExtTest {
assertThat(defaultMQAdminExt.searchOffset(new MessageQueue(TOPIC1,
BROKER1_NAME, 0), System.currentTimeMillis())).isEqualTo(101L);
}
+ @Test
+ public void testSearchOffsetWithSpecificBoundaryType() throws Exception {
+ // do mock
+ DefaultMQAdminExt mockDefaultMQAdminExt =
mock(DefaultMQAdminExt.class);
+
when(mockDefaultMQAdminExt.minOffset(any(MessageQueue.class))).thenReturn(0L);
+
when(mockDefaultMQAdminExt.maxOffset(any(MessageQueue.class))).thenReturn(101L);
+
when(mockDefaultMQAdminExt.searchLowerBoundaryOffset(any(MessageQueue.class),
anyLong())).thenReturn(0L);
+
when(mockDefaultMQAdminExt.searchUpperBoundaryOffset(any(MessageQueue.class),
anyLong())).thenReturn(100L);
+ when(mockDefaultMQAdminExt.queryConsumeTimeSpan(anyString(),
anyString())).thenReturn(mockQueryConsumeTimeSpan());
+
+ for (QueueTimeSpan timeSpan:
mockDefaultMQAdminExt.queryConsumeTimeSpan(TOPIC1, "group_one")) {
+ MessageQueue mq = timeSpan.getMessageQueue();
+ long maxOffset = mockDefaultMQAdminExt.maxOffset(mq);
+ long minOffset = mockDefaultMQAdminExt.minOffset(mq);
+ // if there is at least one message in queue, the maxOffset
returns the queue's latest offset + 1
+ assertThat((maxOffset == 0 ? 0 : maxOffset - 1) ==
mockDefaultMQAdminExt.searchUpperBoundaryOffset(mq,
timeSpan.getMaxTimeStamp())).isTrue();
+ assertThat(minOffset ==
mockDefaultMQAdminExt.searchLowerBoundaryOffset(mq,
timeSpan.getMinTimeStamp())).isTrue();
+ }
+ }
+
+ private List<QueueTimeSpan> mockQueryConsumeTimeSpan() {
+ List<QueueTimeSpan> spanSet = new ArrayList<>();
+ QueueTimeSpan timeSpan = new QueueTimeSpan();
+ timeSpan.setMessageQueue(new MessageQueue(TOPIC1, BROKER1_NAME, 0));
+ timeSpan.setMinTimeStamp(1690421253000L);
+ timeSpan.setMaxTimeStamp(1690507653000L);
+ spanSet.add(timeSpan);
+ return spanSet;
+ }
+
@Test
public void testExamineTopicConfig() throws MQBrokerException,
RemotingException, InterruptedException {
TopicConfig topicConfig =
defaultMQAdminExt.examineTopicConfig("127.0.0.1:10911",
"topic_test_examine_topicConfig");