vongosling closed pull request #129: [ROCKETMQ-239]fix query message by time
and fix get queue offset by time
URL: https://github.com/apache/rocketmq/pull/129
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
index 2a1044578..1aa2ad0ff 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java
@@ -20,12 +20,6 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.FileRegion;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
@@ -34,8 +28,7 @@
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageQueueForC;
import org.apache.rocketmq.common.protocol.RequestCode;
@@ -47,12 +40,21 @@
import
org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import
org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.SelectMappedBufferResult;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+
public class Broker2Client {
private static final InternalLogger log =
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final BrokerController brokerController;
@@ -151,7 +153,8 @@ public RemotingCommand resetOffset(String topic, String
group, long timeStamp, b
timeStampOffset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
- timeStampOffset =
this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i,
timeStamp);
+ timeStampOffset =
this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i,
timeStamp,
+
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
}
if (timeStampOffset < 0) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index f6f8a80af..6cddfc59a 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@ -107,8 +107,8 @@ public long getCommitLogOffsetInQueue(String topic, int
queueId, long consumeQue
}
@Override
- public long getOffsetInQueueByTime(String topic, int queueId, long
timestamp) {
- return next.getOffsetInQueueByTime(topic, queueId, timestamp);
+ public long getOffsetInQueueByTime(String topic, int queueId, long
timestamp, int getLastOrFirstOffset) {
+ return next.getOffsetInQueueByTime(topic, queueId, timestamp,
getLastOrFirstOffset);
}
@Override
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a9e54aa3e..e30183f4e 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -44,6 +44,7 @@
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageDecoder;
@@ -364,8 +365,13 @@ private RemotingCommand
searchOffsetByTimestamp(ChannelHandlerContext ctx,
final SearchOffsetRequestHeader requestHeader =
(SearchOffsetRequestHeader)
request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
+ int getLastOrFirstOffset =
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET;
+ if (requestHeader.getGetLastOrFirstOffset() != null) {
+ getLastOrFirstOffset = requestHeader.getGetLastOrFirstOffset();
+ }
+
long offset =
this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(),
requestHeader.getQueueId(),
- requestHeader.getTimestamp());
+ requestHeader.getTimestamp(), getLastOrFirstOffset);
responseHeader.setOffset(offset);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index cd7067030..0febd53e0 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -117,6 +117,11 @@ public long searchOffset(MessageQueue mq, long timestamp)
throws MQClientExcepti
return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp);
}
+ @Override
+ public long searchOffset(MessageQueue mq, long timestamp, int
getLastOrFirstOffset) throws MQClientException {
+ return this.defaultMQPullConsumerImpl.searchOffset(mq, timestamp,
getLastOrFirstOffset);
+ }
+
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPullConsumerImpl.maxOffset(mq);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index d51030a15..8f2345c44 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -300,6 +300,11 @@ public long searchOffset(MessageQueue mq, long timestamp)
throws MQClientExcepti
return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp);
}
+ @Override
+ public long searchOffset(MessageQueue mq, long timestamp, int
getLastOrFirstOffset) throws MQClientException {
+ return this.defaultMQPushConsumerImpl.searchOffset(mq, timestamp,
getLastOrFirstOffset);
+ }
+
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.defaultMQPushConsumerImpl.maxOffset(mq);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
index f4a8eda23..ec88e63ad 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQConsumer.java
@@ -48,4 +48,6 @@ void sendMessageBack(final MessageExt msg, final int
delayLevel, final String br
* @return queue set
*/
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic) throws
MQClientException;
+
+ long searchOffset(final MessageQueue mq, final long timestamp, final int
getLastOrFirstOffset) throws MQClientException;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 3d2df0f5d..fc00f1a1a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@ -34,6 +34,7 @@
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
@@ -167,6 +168,10 @@ public void createTopic(String key, String newTopic, int
queueNum, int topicSysF
}
public long searchOffset(MessageQueue mq, long timestamp) throws
MQClientException {
+ return this.doSearchOffset(mq, timestamp,
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
+ }
+
+ public long doSearchOffset(MessageQueue mq, long timestamp, int
getLastOrFirstOffset) throws MQClientException {
String brokerAddr =
this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
@@ -176,7 +181,7 @@ public long searchOffset(MessageQueue mq, long timestamp)
throws MQClientExcepti
if (brokerAddr != null) {
try {
return
this.mQClientFactory.getMQClientAPIImpl().searchOffset(brokerAddr,
mq.getTopic(), mq.getQueueId(), timestamp,
- timeoutMillis);
+ getLastOrFirstOffset, timeoutMillis);
} catch (Exception e) {
throw new MQClientException("Invoke Broker[" + brokerAddr + "]
exception", e);
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b07778499..580783bcf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -668,14 +668,14 @@ public MessageExt viewMessage(final String addr, final
long phyoffset, final lon
throw new MQBrokerException(response.getCode(), response.getRemark());
}
-
- public long searchOffset(final String addr, final String topic, final int
queueId, final long timestamp,
- final long timeoutMillis)
+
+ public long searchOffset(final String addr, final String topic, final int
queueId, final long timestamp, final int getLastOrFirstOffset, final long
timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
SearchOffsetRequestHeader requestHeader = new
SearchOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(queueId);
requestHeader.setTimestamp(timestamp);
+ requestHeader.setGetLastOrFirstOffset(getLastOrFirstOffset);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP,
requestHeader);
RemotingCommand response =
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
addr),
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 420d89b2f..b1c3f6040 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -16,13 +16,6 @@
*/
package org.apache.rocketmq.client.impl.consumer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
@@ -45,10 +38,10 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
@@ -59,10 +52,19 @@
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
public class DefaultMQPullConsumerImpl implements MQConsumerInner {
private final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPullConsumer defaultMQPullConsumer;
@@ -471,8 +473,12 @@ public MessageExt queryMessageByUniqKey(String topic,
String uniqKey)
}
public long searchOffset(MessageQueue mq, long timestamp) throws
MQClientException {
+ return this.searchOffset(mq, timestamp,
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
+ }
+
+ public long searchOffset(final MessageQueue mq, final long timestamp,
final int getLastOrFirstOffset) throws MQClientException {
this.makeSureStateOK();
- return this.mQClientFactory.getMQAdminImpl().searchOffset(mq,
timestamp);
+ return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq,
timestamp, getLastOrFirstOffset);
}
public void sendMessageBack(MessageExt msg, int delayLevel, final String
brokerName)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 85a40a9dd..dc2ba66db 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -53,6 +53,7 @@
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl;
@@ -956,7 +957,11 @@ public void resetOffsetByTimeStamp(long timeStamp)
}
public long searchOffset(MessageQueue mq, long timestamp) throws
MQClientException {
- return this.mQClientFactory.getMQAdminImpl().searchOffset(mq,
timestamp);
+ return this.searchOffset(mq, timestamp,
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
+ }
+
+ public long searchOffset(MessageQueue mq, long timestamp, int
getLastOrFirstOffset) throws MQClientException {
+ return this.mQClientFactory.getMQAdminImpl().doSearchOffset(mq,
timestamp, getLastOrFirstOffset);
}
@Override
diff --git
a/common/src/main/java/org/apache/rocketmq/common/constant/OffsetConstant.java
b/common/src/main/java/org/apache/rocketmq/common/constant/OffsetConstant.java
new file mode 100644
index 000000000..0b0e1100a
--- /dev/null
+++
b/common/src/main/java/org/apache/rocketmq/common/constant/OffsetConstant.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.constant;
+
+public class OffsetConstant {
+
+ public static final int SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET =
1;
+
+ public static final int SEARCH_OFFSET_BYTIME_RETURN_RETURN_LAST_OFFSET = 2;
+
+}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
index 5ea2e24bf..21807f1a8 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
@@ -32,6 +32,8 @@
@CFNotNull
private Long timestamp;
+ private Integer getLastOrFirstOffset;
+
@Override
public void checkFields() throws RemotingCommandException {
@@ -61,4 +63,11 @@ public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
+ public Integer getGetLastOrFirstOffset() {
+ return getLastOrFirstOffset;
+ }
+
+ public void setGetLastOrFirstOffset(Integer getLastOrFirstOffset) {
+ this.getLastOrFirstOffset = getLastOrFirstOffset;
+ }
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 08c7f9990..8a6aa4e3c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
@@ -151,13 +152,59 @@ public void recover() {
}
}
- public long getOffsetInQueueByTime(final long timestamp) {
- MappedFile mappedFile =
this.mappedFileQueue.getMappedFileByTime(timestamp);
+ private MappedFile getConsumeQueueMappedFileByStoreTime(long
messageStoreTime,int getLastOrFirstOffset) {
+ long commitLogMinOffset = this.defaultMessageStore.getMinPhyOffset();
+ List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
+ MappedFile resultFile = null;
+ for (MappedFile mappedFile : mappedFiles) {
+ SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
+ if (sbr != null) {
+ try {
+ ByteBuffer byteBuffer = sbr.getByteBuffer();
+ int max = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
+ int min = minLogicOffset > mappedFile.getFileFromOffset()
? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
+ byteBuffer.position(min);
+ long minPhyOffset = byteBuffer.getLong();
+ int minSize = byteBuffer.getInt();
+
+ byteBuffer.position(max);
+ long maxPhyOffset = byteBuffer.getLong();
+ if (maxPhyOffset < commitLogMinOffset) {
+ continue;
+ }
+ int maxSize = byteBuffer.getInt();
+
+ long minStoreTime =
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(minPhyOffset,
minSize);
+ if (messageStoreTime < minStoreTime) {
+ return resultFile;
+ }
+
+ long maxStoreTime =
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(maxPhyOffset,
maxSize);
+ if (maxStoreTime < messageStoreTime) {
+ continue;
+ }
+ if (getLastOrFirstOffset ==
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_LAST_OFFSET) {
+ resultFile = mappedFile;
+ } else {
+ resultFile = mappedFile;
+ break;
+ }
+ } finally {
+ sbr.release();
+ }
+ }
+ }
+ return resultFile;
+ }
+
+ public long getOffsetInQueueByTime(final long timestamp, int
getLastOrFirstOffset) {
+ MappedFile mappedFile =
getConsumeQueueMappedFileByStoreTime(timestamp, getLastOrFirstOffset);
if (mappedFile != null) {
long offset = 0;
int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int)
(minLogicOffset - mappedFile.getFileFromOffset()) : 0;
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1,
rightOffset = -1;
+ long lastTime = 0;
long leftIndexValue = -1L, rightIndexValue = -1L;
long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
@@ -176,8 +223,8 @@ public long getOffsetInQueueByTime(final long timestamp) {
continue;
}
- long storeTime =
-
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+ long storeTime =
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+ lastTime = storeTime;
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
@@ -195,8 +242,7 @@ public long getOffsetInQueueByTime(final long timestamp) {
}
if (targetOffset != -1) {
-
- offset = targetOffset;
+ offset =
getFixedFirstOrLastOffsetByTime(mappedFile,byteBuffer,targetOffset,getLastOrFirstOffset,minPhysicOffset,timestamp);
} else {
if (leftIndexValue == -1) {
@@ -205,10 +251,13 @@ public long getOffsetInQueueByTime(final long timestamp) {
offset = leftOffset;
} else {
- offset =
- Math.abs(timestamp - leftIndexValue) >
Math.abs(timestamp
- - rightIndexValue) ? rightOffset :
leftOffset;
+ if (getLastOrFirstOffset ==
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_LAST_OFFSET) {
+ offset = leftOffset;
+ } else {
+ offset = rightOffset;
+ }
}
+ offset =
getFixedFirstOrLastOffsetByTime(mappedFile,byteBuffer,(int)offset,getLastOrFirstOffset,minPhysicOffset,lastTime);
}
return (mappedFile.getFileFromOffset() + offset) /
CQ_STORE_UNIT_SIZE;
@@ -220,6 +269,45 @@ public long getOffsetInQueueByTime(final long timestamp) {
return 0;
}
+ private long getStoreTimeStamp(ByteBuffer byteBuffer, int targetOffset) {
+ byteBuffer.position(targetOffset);
+ long phyOffset = byteBuffer.getLong();
+ int size = byteBuffer.getInt();
+ return
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+ }
+
+ private int getFixedFirstOrLastOffsetByTime(MappedFile mappedFile,
ByteBuffer byteBuffer, int targetOffset, int getLastOrFirstOffset, long
minPhysicOffset, long timestamp) {
+ int candidateOffset = targetOffset;
+ int fixedLow = minLogicOffset > mappedFile.getFileFromOffset() ? (int)
(minLogicOffset - mappedFile.getFileFromOffset()) : 0;
+ int fixedOffset = candidateOffset;
+ while (fixedOffset >= fixedLow) {
+ byteBuffer.position(fixedOffset);
+ long phyOffset = byteBuffer.getLong();
+ int size = byteBuffer.getInt();
+ if (phyOffset < minPhysicOffset) {
+ break;
+ }
+ if (getLastOrFirstOffset ==
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET) {
+ long storeTime =
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+ if (storeTime < timestamp) {
+ break;
+ } else {
+ candidateOffset = fixedOffset;
+ fixedOffset = candidateOffset - CQ_STORE_UNIT_SIZE;
+ }
+ } else {
+ long storeTime =
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
+ if (storeTime > timestamp) {
+ break;
+ } else {
+ candidateOffset = fixedOffset;
+ fixedOffset = candidateOffset + CQ_STORE_UNIT_SIZE;
+ }
+ }
+ }
+ return candidateOffset;
+ }
+
public void truncateDirtyLogicFiles(long phyOffet) {
int logicFileSize = this.mappedFileSize;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 4fc7412b1..436100820 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -640,10 +640,10 @@ public long getCommitLogOffsetInQueue(String topic, int
queueId, long consumeQue
return 0;
}
- public long getOffsetInQueueByTime(String topic, int queueId, long
timestamp) {
+ public long getOffsetInQueueByTime(String topic, int queueId, long
timestamp, int getLastOrFirstOffset) {
ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
if (logic != null) {
- return logic.getOffsetInQueueByTime(timestamp);
+ return logic.getOffsetInQueueByTime(timestamp,
getLastOrFirstOffset);
}
return 0;
@@ -821,10 +821,11 @@ public void executeDeleteFilesManually() {
public QueryMessageResult queryMessage(String topic, String key, int
maxNum, long begin, long end) {
QueryMessageResult queryMessageResult = new QueryMessageResult();
- long lastQueryMsgTime = end;
+ long fixedBeginTime = begin - 1001;
+ long lastQueryMsgTime = end + 1;
for (int i = 0; i < 3; i++) {
- QueryOffsetResult queryOffsetResult =
this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
+ QueryOffsetResult queryOffsetResult =
this.indexService.queryOffset(topic, key, maxNum, fixedBeginTime,
lastQueryMsgTime);
if (queryOffsetResult.getPhyOffsets().isEmpty()) {
break;
}
@@ -839,33 +840,28 @@ public QueryMessageResult queryMessage(String topic,
String key, int maxNum, lon
try {
- boolean match = true;
- MessageExt msg = this.lookMessageByOffset(offset);
- if (0 == m) {
- lastQueryMsgTime = msg.getStoreTimestamp();
- }
+ SelectMappedBufferResult result =
this.commitLog.getData(offset, false);
+ if (result != null) {
+ int size = result.getByteBuffer().getInt(0);
+ result.getByteBuffer().limit(size);
+ result.setSize(size);
+ long storeTime = getMessageStoreTimestamp(result);
+
+ boolean keyMatch = true;
+ // check key match,now ignore check key duplicate
+ if (keyMatch) {
+ if (storeTime >= begin && storeTime <= end) {
+ queryMessageResult.addMessage(result);
+ }
+ } else {
+ log.warn("queryMessage hash duplicate, {} {}",
topic, key);
+ }
-// String[] keyArray =
msg.getKeys().split(MessageConst.KEY_SEPARATOR);
-// if (topic.equals(msg.getTopic())) {
-// for (String k : keyArray) {
-// if (k.equals(key)) {
-// match = true;
-// break;
-// }
-// }
-// }
-
- if (match) {
- SelectMappedBufferResult result =
this.commitLog.getData(offset, false);
- if (result != null) {
- int size = result.getByteBuffer().getInt(0);
- result.getByteBuffer().limit(size);
- result.setSize(size);
- queryMessageResult.addMessage(result);
+ if (0 == m) {
+ lastQueryMsgTime = storeTime;
}
- } else {
- log.warn("queryMessage hash duplicate, {} {}", topic,
key);
}
+
} catch (Exception e) {
log.error("queryMessage exception", e);
}
@@ -875,7 +871,7 @@ public QueryMessageResult queryMessage(String topic, String
key, int maxNum, lon
break;
}
- if (lastQueryMsgTime < begin) {
+ if (lastQueryMsgTime < fixedBeginTime) {
break;
}
}
@@ -883,6 +879,10 @@ public QueryMessageResult queryMessage(String topic,
String key, int maxNum, lon
return queryMessageResult;
}
+ private long getMessageStoreTimestamp(SelectMappedBufferResult message) {
+ return
message.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
+ }
+
@Override
public void updateHaMasterAddress(String newAddr) {
this.haService.updateMasterAddress(newAddr);
@@ -1428,6 +1428,7 @@ public void dispatch(DispatchRequest request) {
@Override
public void dispatch(DispatchRequest request) {
+
if
(DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 907dfe209..529c379cb 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@ -116,9 +116,10 @@ GetMessageResult getMessage(final String group, final
String topic, final int qu
* @param topic Topic of the message.
* @param queueId Queue ID.
* @param timestamp Timestamp to look up.
+ * @param getLastOrFirstOffset If there is many messages at this given
timestamp , return the first offset if getLastOrFirstOffset=1,else return last
offset
* @return physical offset which matches.
*/
- long getOffsetInQueueByTime(final String topic, final int queueId, final
long timestamp);
+ long getOffsetInQueueByTime(final String topic, final int queueId, final
long timestamp, int getLastOrFirstOffset);
/**
* Look up the message by given commit log offset.
diff --git
a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 9269cdfa7..7ea5f7a54 100644
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@ -21,21 +21,29 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
import java.nio.channels.OverlappingFileLockException;
import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.OffsetConstant;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.common.UtilAll;
import org.junit.After;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.Before;
import org.junit.Test;
+import org.junit.Assert;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertTrue;
+
public class DefaultMessageStoreTest {
private final String StoreMessage = "Once, there was a chance for me!";
@@ -53,7 +61,7 @@ public void init() throws Exception {
messageStore = buildMessageStore();
boolean load = messageStore.load();
- assertTrue(load);
+ Assert.assertTrue(load);
messageStore.start();
}
@@ -71,7 +79,7 @@ public void test_repate_restart() throws Exception {
MessageStore master = new DefaultMessageStore(messageStoreConfig,
null, new MyMessageArrivingListener(), new BrokerConfig());
boolean load = master.load();
- assertTrue(load);
+ Assert.assertTrue(load);
try {
master.start();
@@ -107,6 +115,7 @@ public void testWriteAndRead() throws Exception {
long totalMsgs = 100;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
+
for (long i = 0; i < totalMsgs; i++) {
messageStore.putMessage(buildMessage());
}
@@ -139,6 +148,7 @@ public void testGroupCommit() throws Exception {
long totalMsgs = 10;
QUEUE_TOTAL = 1;
MessageBody = StoreMessage.getBytes();
+
for (long i = 0; i < totalMsgs; i++) {
messageStore.putMessage(buildMessage());
}
@@ -193,4 +203,125 @@ public void arriving(String topic, int queueId, long
logicOffset, long tagsCode,
byte[] filterBitMap, Map<String, String>
properties) {
}
}
+
+ @Test
+ public void testQueryByTime() throws Exception {
+ int totalMsgs = 100;
+ int randomIndex = new Random().nextInt(10) + 40;
+ QUEUE_TOTAL = 8;
+ String topic = "TimeTopic";
+ String keys = "testQueryByTime";
+ long now = System.currentTimeMillis();
+ MessageBody = StoreMessage.getBytes();
+ for (int i = 0; i < totalMsgs; i++) {
+ MessageExtBrokerInner messageExtBrokerInner = new
MessageExtBrokerInner();
+ messageExtBrokerInner.setBody(("time:" +
System.currentTimeMillis() + " index:" + i).getBytes());
+ messageExtBrokerInner.setTopic(topic);
+ messageExtBrokerInner.setKeys(keys);
+
messageExtBrokerInner.setQueueId(Math.abs(QueueId.getAndIncrement()) %
QUEUE_TOTAL);
+ messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
+ messageExtBrokerInner.setStoreHost(StoreHost);
+ messageExtBrokerInner.setBornHost(BornHost);
+
messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
+ PutMessageResult putMessageResult =
messageStore.putMessage(messageExtBrokerInner);
+ if (i == randomIndex) {
+ now =
putMessageResult.getAppendMessageResult().getStoreTimestamp();
+ }
+ }
+ Thread.sleep(2000L);
+ long end = System.currentTimeMillis();
+ QueryMessageResult result = messageStore.queryMessage(topic, keys,
totalMsgs, now, end);
+ for (ByteBuffer byteBuffer : result.getMessageBufferList()) {
+ MessageExt messageExt = MessageDecoder.decode(byteBuffer);
+ }
+ int bufferTotalSize = result.getMessageBufferList().size();
+ result.release();
+ Assert.assertTrue(totalMsgs - randomIndex - 1 <= bufferTotalSize);
+ }
+
+ @Test
+ public void testQueueOffsetByTime() throws Exception {
+ long totalMsgs = 100;
+ QUEUE_TOTAL = 1;
+ String topic = "TimeTopic";
+ String keys = "testQueryByTime";
+ String consumerGroup = "testQueryByTime";
+ MessageBody = StoreMessage.getBytes();
+
+ TreeMap<Long, AtomicInteger> sameTimeCountCache = new TreeMap<>();
+ TreeMap<Long, AtomicInteger> sameTimeResultCache = new TreeMap<>();
+ long hlTime = System.currentTimeMillis();
+ long hlCount = 0;
+ long endTime = System.currentTimeMillis();
+ int endCount1 = 0;
+ int endCount2 = 0;
+ long start = 0;
+
+ for (long i = 0; i < totalMsgs; i++) {
+ if (i == totalMsgs - 20) {
+ Thread.sleep(1000);
+ hlTime = System.currentTimeMillis();
+ Thread.sleep(500);
+ }
+ if (i == totalMsgs - 10) {
+ endTime = System.currentTimeMillis();
+ }
+ MessageExtBrokerInner messageExtBrokerInner = new
MessageExtBrokerInner();
+ messageExtBrokerInner.setBody(("time:" +
System.currentTimeMillis() + " index:" + i).getBytes());
+ messageExtBrokerInner.setTopic(topic);
+ messageExtBrokerInner.setKeys(keys);
+
messageExtBrokerInner.setQueueId(Math.abs(QueueId.getAndIncrement()) %
QUEUE_TOTAL);
+ messageExtBrokerInner.setBornTimestamp(System.currentTimeMillis());
+ messageExtBrokerInner.setStoreHost(StoreHost);
+ messageExtBrokerInner.setBornHost(BornHost);
+
messageExtBrokerInner.setPropertiesString(MessageDecoder.messageProperties2String(messageExtBrokerInner.getProperties()));
+ PutMessageResult putMessageResult =
messageStore.putMessage(messageExtBrokerInner);
+ long storeTimestamp =
putMessageResult.getAppendMessageResult().getStoreTimestamp();
+ AtomicInteger count = sameTimeCountCache.get(storeTimestamp);
+ if (count == null) {
+ count = new AtomicInteger(0);
+ sameTimeCountCache.put(storeTimestamp, count);
+ }
+ count.incrementAndGet();
+ }
+ Thread.sleep(2000L);
+
+ Map.Entry<Long, AtomicInteger> timeCount =
sameTimeCountCache.lastEntry();
+ start = timeCount.getKey();
+ long offsetInQueueByTime = messageStore.getOffsetInQueueByTime(topic,
0, start, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
+ GetMessageResult testQueryByTime =
messageStore.getMessage(consumerGroup, topic, 0, offsetInQueueByTime, 20, null);
+
+ List<ByteBuffer> messageBufferList =
testQueryByTime.getMessageBufferList();
+ for (ByteBuffer byteBuffer : messageBufferList) {
+ MessageExt messageExt = MessageDecoder.decode(byteBuffer);
+ AtomicInteger cc =
sameTimeResultCache.get(messageExt.getStoreTimestamp());
+ if (cc == null) {
+ cc = new AtomicInteger(0);
+ sameTimeResultCache.put(messageExt.getStoreTimestamp(), cc);
+ }
+ cc.incrementAndGet();
+ }
+ testQueryByTime.release();
+
+ long hlOffset = messageStore.getOffsetInQueueByTime(topic, 0, hlTime,
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
+ GetMessageResult hlResult = messageStore.getMessage(consumerGroup,
topic, 0, hlOffset, 20, null);
+ hlCount = hlResult.getMessageCount();
+ hlResult.release();
+
+ long endOffset1 = messageStore.getOffsetInQueueByTime(topic, 0,
endTime, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
+ GetMessageResult endResult1 = messageStore.getMessage(consumerGroup,
topic, 0, endOffset1, 20, null);
+ endCount1 = endResult1.getMessageCount();
+ endResult1.release();
+
+ long endOffset2 = messageStore.getOffsetInQueueByTime(topic, 0,
endTime, OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET);
+ GetMessageResult endResult2 = messageStore.getMessage(consumerGroup,
topic, 0, endOffset2, 20, null);
+ endCount2 = endResult2.getMessageCount();
+ endResult2.release();
+ Assert.assertTrue(start > 0);
+ AtomicInteger cc = sameTimeCountCache.get(start);
+ AtomicInteger result = sameTimeResultCache.get(start);
+ Assert.assertEquals(cc.get(), result.get());
+ Assert.assertTrue(19 == hlCount || hlCount == 20);
+ Assert.assertTrue(endCount1 >= endCount2);
+ }
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index eb45de22f..127c21084 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -103,6 +103,11 @@ public long searchOffset(MessageQueue mq, long timestamp)
throws MQClientExcepti
return defaultMQAdminExtImpl.searchOffset(mq, timestamp);
}
+ @Override
+ public long searchOffset(MessageQueue mq, long timestamp,int
getLastOrFirstOffset) throws MQClientException {
+ return defaultMQAdminExtImpl.searchOffset(mq, timestamp,
getLastOrFirstOffset);
+ }
+
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return defaultMQAdminExtImpl.maxOffset(mq);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index bcd66669c..1dd189050 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -46,6 +46,7 @@
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
+import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientExt;
@@ -509,7 +510,7 @@ private RollbackStats resetOffsetConsumeOffset(String
brokerAddr, String consume
} else {
resetOffset =
this.mqClientInstance.getMQClientAPIImpl().searchOffset(brokerAddr,
queue.getTopic(), queue.getQueueId(), timestamp,
- timeoutMillis);
+
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_FIRST_OFFSET, timeoutMillis);
}
RollbackStats rollbackStats = new RollbackStats();
@@ -937,6 +938,11 @@ public long searchOffset(MessageQueue mq, long timestamp)
throws MQClientExcepti
return this.mqClientInstance.getMQAdminImpl().searchOffset(mq,
timestamp);
}
+ @Override
+ public long searchOffset(MessageQueue mq, long timestamp,int
getLastOrFirstOffset) throws MQClientException {
+ return this.mqClientInstance.getMQAdminImpl().doSearchOffset(mq,
timestamp, getLastOrFirstOffset);
+ }
+
@Override
public long maxOffset(MessageQueue mq) throws MQClientException {
return this.mqClientInstance.getMQAdminImpl().maxOffset(mq);
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 16b442757..e817ea16d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -259,4 +259,6 @@ QueryConsumeQueueResponseBody queryConsumeQueue(final
String brokerAddr,
final String topic, final int queueId,
final long index, final int count, final String consumerGroup)
throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQClientException;
+
+ long searchOffset(final MessageQueue mq, final long timestamp, final int
getLastOrFirstOffset) throws MQClientException;
}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
index 46c5f7473..3743e9db7 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java
@@ -17,13 +17,6 @@
package org.apache.rocketmq.tools.command.message;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -32,12 +25,21 @@
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
public class PrintMessageByQueueCommand implements SubCommand {
public static long timestampFormat(final String value) {
@@ -190,7 +192,7 @@ public void execute(CommandLine commandLine, Options
options, RPCHook rpcHook) t
if (commandLine.hasOption('e')) {
String timestampStr = commandLine.getOptionValue('e').trim();
long timeValue = timestampFormat(timestampStr);
- maxOffset = consumer.searchOffset(mq, timeValue);
+ maxOffset = consumer.searchOffset(mq, timeValue,
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_LAST_OFFSET);
}
final Map<String, AtomicLong> tagCalmap = new HashMap<String,
AtomicLong>();
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
index b204f0abf..5a169c359 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageSubCommand.java
@@ -26,6 +26,7 @@
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.OffsetConstant;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
@@ -131,7 +132,7 @@ public void execute(CommandLine commandLine, Options
options, RPCHook rpcHook) t
if (commandLine.hasOption('e')) {
String timestampStr =
commandLine.getOptionValue('e').trim();
long timeValue = timestampFormat(timestampStr);
- maxOffset = consumer.searchOffset(mq, timeValue);
+ maxOffset = consumer.searchOffset(mq, timeValue,
OffsetConstant.SEARCH_OFFSET_BYTIME_RETURN_RETURN_LAST_OFFSET);
}
System.out.printf("minOffset=%s, maxOffset=%s, %s", minOffset,
maxOffset, mq);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services